http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/java/org/apache/flink/test/testdata/WordCountData.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/WordCountData.java
 
b/flink-test-utils/src/test/java/org/apache/flink/test/testdata/WordCountData.java
new file mode 100644
index 0000000..23c649b
--- /dev/null
+++ 
b/flink-test-utils/src/test/java/org/apache/flink/test/testdata/WordCountData.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.testdata;
+
+
+/**
+ *
+ */
+public class WordCountData {
+
+       public static final String TEXT = "Goethe - Faust: Der Tragoedie erster 
Teil\n" + "Prolog im Himmel.\n"
+                       + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+                       + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
+                       + "Und ihre vorgeschriebne Reise Vollendet sie mit 
Donnergang. Ihr Anblick\n"
+                       + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden 
mag; die unbegreiflich\n"
+                       + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+                       + "GABRIEL: Und schnell und unbegreiflich schnelle 
Dreht sich umher der Erde\n"
+                       + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
+                       + "schaeumt das Meer in breiten Fluessen Am tiefen 
Grund der Felsen auf, Und\n"
+                       + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
+                       + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer 
aufs Land, vom Land\n"
+                       + "aufs Meer, und bilden wuetend eine Kette Der 
tiefsten Wirkung rings umher.\n"
+                       + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
+                       + "deine Boten, Herr, verehren Das sanfte Wandeln 
deines Tags.\n"
+                       + "ZU DREI: Der Anblick gibt den Engeln Staerke, Da 
keiner dich ergruenden\n"
+                       + "mag, Und alle deine hohen Werke Sind herrlich wie am 
ersten Tag.\n"
+                       + "MEPHISTOPHELES: Da du, o Herr, dich einmal wieder 
nahst Und fragst, wie\n"
+                       + "alles sich bei uns befinde, Und du mich sonst 
gewoehnlich gerne sahst, So\n"
+                       + "siehst du mich auch unter dem Gesinde. Verzeih, ich 
kann nicht hohe Worte\n"
+                       + "machen, Und wenn mich auch der ganze Kreis 
verhoehnt; Mein Pathos braechte\n"
+                       + "dich gewiss zum Lachen, Haettst du dir nicht das 
Lachen abgewoehnt. Von\n"
+                       + "Sonn' und Welten weiss ich nichts zu sagen, Ich sehe 
nur, wie sich die\n"
+                       + "Menschen plagen. Der kleine Gott der Welt bleibt 
stets von gleichem\n"
+                       + "Schlag, Und ist so wunderlich als wie am ersten Tag. 
Ein wenig besser\n"
+                       + "wuerd er leben, Haettst du ihm nicht den Schein des 
Himmelslichts gegeben;\n"
+                       + "Er nennt's Vernunft und braucht's allein, Nur 
tierischer als jedes Tier\n"
+                       + "zu sein. Er scheint mir, mit Verlaub von euer 
Gnaden, Wie eine der\n"
+                       + "langbeinigen Zikaden, Die immer fliegt und fliegend 
springt Und gleich im\n"
+                       + "Gras ihr altes Liedchen singt; Und laeg er nur noch 
immer in dem Grase! In\n"
+                       + "jeden Quark begraebt er seine Nase.\n"
+                       + "DER HERR: Hast du mir weiter nichts zu sagen? Kommst 
du nur immer\n"
+                       + "anzuklagen? Ist auf der Erde ewig dir nichts 
recht?\n"
+                       + "MEPHISTOPHELES: Nein Herr! ich find es dort, wie 
immer, herzlich\n"
+                       + "schlecht. Die Menschen dauern mich in ihren 
Jammertagen, Ich mag sogar\n"
+                       + "die armen selbst nicht plagen.\n" + "DER HERR: 
Kennst du den Faust?\n" + "MEPHISTOPHELES: Den Doktor?\n"
+                       + "DER HERR: Meinen Knecht!\n"
+                       + "MEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre 
Weise. Nicht irdisch\n"
+                       + "ist des Toren Trank noch Speise. Ihn treibt die 
Gaerung in die Ferne, Er\n"
+                       + "ist sich seiner Tollheit halb bewusst; Vom Himmel 
fordert er die schoensten\n"
+                       + "Sterne Und von der Erde jede hoechste Lust, Und alle 
Naeh und alle Ferne\n"
+                       + "Befriedigt nicht die tiefbewegte Brust.\n"
+                       + "DER HERR: Wenn er mir auch nur verworren dient, So 
werd ich ihn bald in\n"
+                       + "die Klarheit fuehren. Weiss doch der Gaertner, wenn 
das Baeumchen gruent, Das\n"
+                       + "Bluet und Frucht die kuenft'gen Jahre zieren.\n"
+                       + "MEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch 
verlieren! Wenn Ihr\n"
+                       + "mir die Erlaubnis gebt, Ihn meine Strasse sacht zu 
fuehren.\n"
+                       + "DER HERR: Solang er auf der Erde lebt, So lange sei 
dir's nicht verboten,\n"
+                       + "Es irrt der Mensch so lang er strebt.\n"
+                       + "MEPHISTOPHELES: Da dank ich Euch; denn mit den Toten 
Hab ich mich niemals\n"
+                       + "gern befangen. Am meisten lieb ich mir die vollen, 
frischen Wangen. Fuer\n"
+                       + "einem Leichnam bin ich nicht zu Haus; Mir geht es 
wie der Katze mit der Maus.\n"
+                       + "DER HERR: Nun gut, es sei dir ueberlassen! Zieh 
diesen Geist von seinem\n"
+                       + "Urquell ab, Und fuehr ihn, kannst du ihn erfassen, 
Auf deinem Wege mit\n"
+                       + "herab, Und steh beschaemt, wenn du bekennen musst: 
Ein guter Mensch, in\n"
+                       + "seinem dunklen Drange, Ist sich des rechten Weges 
wohl bewusst.\n"
+                       + "MEPHISTOPHELES: Schon gut! nur dauert es nicht 
lange. Mir ist fuer meine\n"
+                       + "Wette gar nicht bange. Wenn ich zu meinem Zweck 
gelange, Erlaubt Ihr mir\n"
+                       + "Triumph aus voller Brust. Staub soll er fressen, und 
mit Lust, Wie meine\n"
+                       + "Muhme, die beruehmte Schlange.\n"
+                       + "DER HERR: Du darfst auch da nur frei erscheinen; Ich 
habe deinesgleichen\n"
+                       + "nie gehasst. Von allen Geistern, die verneinen, ist 
mir der Schalk am\n"
+                       + "wenigsten zur Last. Des Menschen Taetigkeit kann 
allzu leicht erschlaffen,\n"
+                       + "er liebt sich bald die unbedingte Ruh; Drum geb ich 
gern ihm den Gesellen\n"
+                       + "zu, Der reizt und wirkt und muss als Teufel 
schaffen. Doch ihr, die echten\n"
+                       + "Goettersoehne, Erfreut euch der lebendig reichen 
Schoene! Das Werdende, das\n"
+                       + "ewig wirkt und lebt, Umfass euch mit der Liebe 
holden Schranken, Und was\n"
+                       + "in schwankender Erscheinung schwebt, Befestigt mit 
dauernden Gedanken!\n"
+                       + "(Der Himmel schliesst, die Erzengel verteilen 
sich.)\n"
+                       + "MEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich 
den Alten gern, Und\n"
+                       + "huete mich, mit ihm zu brechen. Es ist gar huebsch 
von einem grossen Herrn,\n"
+                       + "So menschlich mit dem Teufel selbst zu sprechen.";
+
+       public static final String COUNTS = "machen 1\n" + "zeit 2\n" + 
"heerscharen 1\n" + "keiner 2\n" + "meine 3\n"
+                       + "fuehr 1\n" + "triumph 1\n" + "kommst 1\n" + "frei 
1\n" + "schaffen 1\n" + "gesinde 1\n"
+                       + "langbeinigen 1\n" + "schalk 1\n" + "besser 1\n" + 
"solang 1\n" + "meer 4\n" + "fragst 1\n"
+                       + "gabriel 1\n" + "selbst 2\n" + "bin 1\n" + "sich 7\n" 
+ "du 11\n" + "sogar 1\n" + "geht 1\n"
+                       + "immer 4\n" + "mensch 2\n" + "befestigt 1\n" + "lebt 
2\n" + "mag 3\n" + "engeln 2\n" + "breiten 1\n"
+                       + "blitzendes 1\n" + "tags 1\n" + "sie 2\n" + "plagen 
2\n" + "allzu 1\n" + "meisten 1\n" + "o 1\n"
+                       + "pfade 1\n" + "kennst 1\n" + "nichts 3\n" + "gedanken 
1\n" + "befriedigt 1\n" + "mich 6\n" + "s 3\n"
+                       + "es 8\n" + "verneinen 1\n" + "er 13\n" + "gleich 1\n" 
+ "baeumchen 1\n" + "donnergang 1\n"
+                       + "wunderlich 1\n" + "reise 1\n" + "urquell 1\n" + 
"doch 3\n" + "aufs 2\n" + "toten 1\n" + "niemals 1\n"
+                       + "eine 2\n" + "hab 1\n" + "darfst 1\n" + "da 5\n" + 
"gen 1\n" + "einem 2\n" + "teil 1\n" + "das 7\n"
+                       + "speise 1\n" + "wenig 1\n" + "sterne 1\n" + "geb 1\n" 
+ "welten 1\n" + "alle 3\n" + "toent 1\n"
+                       + "gras 1\n" + "felsen 1\n" + "kette 1\n" + "ich 14\n" 
+ "fuer 2\n" + "als 3\n" + "mein 1\n"
+                       + "schoene 1\n" + "verzeih 1\n" + "schwankender 1\n" + 
"wie 9\n" + "menschlich 1\n" + "gaertner 1\n"
+                       + "taetigkeit 1\n" + "bange 1\n" + "liebe 1\n" + "sei 
2\n" + "seh 1\n" + "tollheit 1\n" + "am 6\n"
+                       + "michael 1\n" + "geist 1\n" + "ab 1\n" + "nahst 1\n" 
+ "vollendet 1\n" + "liebt 1\n" + "brausen 1\n"
+                       + "nase 1\n" + "erlaubt 1\n" + "weiss 2\n" + "schnellem 
1\n" + "deinem 1\n" + "gleichem 1\n"
+                       + "gaerung 1\n" + "dauernden 1\n" + "deines 1\n" + 
"vorgeschriebne 1\n" + "irdisch 1\n" + "worte 1\n"
+                       + "verehren 1\n" + "hohen 2\n" + "weise 2\n" + "kuenft 
1\n" + "werdende 1\n" + "wette 2\n" + "wuetend 1\n"
+                       + "erscheinung 1\n" + "gar 2\n" + "verlieren 1\n" + 
"braucht 1\n" + "weiter 1\n" + "trank 1\n"
+                       + "tierischer 1\n" + "wohl 1\n" + "verteilen 1\n" + 
"verhoehnt 1\n" + "schaeumt 1\n" + "himmelslichts 1\n"
+                       + "unbedingte 1\n" + "herzlich 1\n" + "anblick 2\n" + 
"nennt 1\n" + "gruent 1\n" + "bluet 1\n"
+                       + "leichnam 1\n" + "erschlaffen 1\n" + "jammertagen 
1\n" + "zieh 1\n" + "ihm 3\n" + "besondre 1\n"
+                       + "ihn 5\n" + "grossen 1\n" + "vollen 1\n" + "ihr 7\n" 
+ "boten 1\n" + "voller 1\n" + "singt 1\n"
+                       + "muhme 1\n" + "schon 1\n" + "last 1\n" + "kleine 1\n" 
+ "paradieseshelle 1\n" + "nein 1\n" + "echten 1\n"
+                       + "unter 1\n" + "bei 1\n" + "herr 11\n" + "gern 3\n" + 
"sphaerenlauf 1\n" + "stets 1\n" + "ganze 1\n"
+                       + "braechte 1\n" + "fordert 1\n" + "schoensten 1\n" + 
"herrlich 2\n" + "gegeben 1\n" + "allein 2\n"
+                       + "reichen 1\n" + "schauervoller 1\n" + "musst 1\n" + 
"recht 1\n" + "bleibt 1\n" + "pracht 1\n"
+                       + "treibt 1\n" + "befangen 1\n" + "was 2\n" + "menschen 
3\n" + "jede 1\n" + "hohe 1\n" + "tiefsten 1\n"
+                       + "bilden 1\n" + "drum 1\n" + "gibt 2\n" + "guter 1\n" 
+ "fuerwahr 1\n" + "im 3\n" + "grund 1\n" + "in 9\n"
+                       + "hoechste 1\n" + "schliesst 1\n" + "fels 1\n" + "steh 
1\n" + "euer 1\n" + "erster 1\n" + "ersten 3\n"
+                       + "goettersoehne 1\n" + "brechen 1\n" + "tiefen 1\n" + 
"frucht 1\n" + "kreis 1\n" + "siehst 1\n"
+                       + "wege 1\n" + "ist 8\n" + "zikaden 1\n" + "frischen 
1\n" + "ruh 1\n" + "deine 2\n" + "maus 1\n"
+                       + "brudersphaeren 1\n" + "nachher 1\n" + "euch 4\n" + 
"gnaden 1\n" + "anzuklagen 1\n" + "schlange 1\n"
+                       + "staerke 2\n" + "erde 4\n" + "verlaub 1\n" + "sanfte 
1\n" + "holden 1\n" + "sonst 1\n" + "treten 1\n"
+                       + "sahst 1\n" + "alten 1\n" + "um 1\n" + "wieder 1\n" + 
"alter 1\n" + "altes 1\n" + "nun 1\n" + "lieb 1\n"
+                       + "gesellen 1\n" + "erscheinen 1\n" + "wirkt 2\n" + 
"haettst 2\n" + "nur 7\n" + "tiefbewegte 1\n"
+                       + "lachen 2\n" + "drange 1\n" + "schlag 1\n" + "schein 
1\n" + "muss 1\n" + "verworren 1\n" + "weges 1\n"
+                       + "allen 1\n" + "gewoehnlich 1\n" + "alles 1\n" + "halb 
1\n" + "stuerme 1\n" + "springt 1\n" + "sollt 1\n"
+                       + "klarheit 1\n" + "so 6\n" + "erfassen 1\n" + 
"liedchen 1\n" + "prolog 1\n" + "zur 1\n" + "fressen 1\n"
+                       + "zum 1\n" + "faust 2\n" + "erzengel 2\n" + "jahre 
1\n" + "sonn 1\n" + "raphael 1\n" + "land 2\n"
+                       + "lang 1\n" + "gelange 1\n" + "lust 2\n" + "welt 1\n" 
+ "sehe 1\n" + "ihre 1\n" + "jedes 1\n"
+                       + "erfreut 1\n" + "seiner 1\n" + "denn 1\n" + "wandeln 
1\n" + "wechselt 1\n" + "jeden 1\n" + "dort 1\n"
+                       + "schlecht 1\n" + "wenigsten 1\n" + "wuerd 1\n" + 
"schranken 1\n" + "bewusst 2\n" + "seinem 2\n"
+                       + "gehasst 1\n" + "sein 1\n" + "meinem 1\n" + "meinen 
1\n" + "pathos 1\n" + "herrn 1\n" + "lange 2\n"
+                       + "herab 1\n" + "diesen 1\n" + "ihren 1\n" + "beruehmte 
1\n" + "goethe 1\n" + "tag 3\n" + "tier 1\n"
+                       + "quark 1\n" + "dank 1\n" + "seine 1\n" + "teufel 2\n" 
+ "zweck 1\n" + "wenn 7\n" + "soll 1\n"
+                       + "wirkung 1\n" + "erlaubnis 1\n" + "lebendig 1\n" + 
"uns 1\n" + "leicht 1\n" + "gewiss 1\n"
+                       + "schnell 1\n" + "und 29\n" + "gerne 1\n" + "rechten 
1\n" + "umher 2\n" + "vernunft 1\n" + "grase 1\n"
+                       + "nach 1\n" + "leben 1\n" + "gott 1\n" + "der 29\n" + 
"des 5\n" + "doktor 1\n" + "beschaemt 1\n"
+                       + "dreht 1\n" + "habe 1\n" + "sagen 2\n" + "bekennen 
1\n" + "dunklen 1\n" + "wettet 1\n" + "den 9\n"
+                       + "mephistopheles 9\n" + "dem 4\n" + "auch 4\n" + "kann 
2\n" + "armen 1\n" + "mir 9\n" + "strebt 1\n"
+                       + "gut 2\n" + "mit 11\n" + "bald 2\n" + "himmlischen 
1\n" + "himmel 3\n" + "noch 3\n" + "kannst 1\n"
+                       + "deinesgleichen 1\n" + "flammt 1\n" + "ergruenden 
2\n" + "nacht 1\n" + "scheint 1\n" + "ferne 2\n"
+                       + "tragoedie 1\n" + "abgewoehnt 1\n" + "reizt 1\n" + 
"geistern 1\n" + "nicht 10\n" + "sacht 1\n"
+                       + "unbegreiflich 2\n" + "schnelle 1\n" + "einmal 1\n" + 
"werd 1\n" + "werke 2\n" + "begraebt 1\n"
+                       + "knecht 1\n" + "rings 1\n" + "wird 1\n" + "katze 1\n" 
+ "huete 1\n" + "fortgerissen 1\n" + "gebt 1\n"
+                       + "huebsch 1\n" + "hast 1\n" + "irrt 1\n" + "befinde 
1\n" + "sind 2\n" + "fuehren 2\n" + "fliegt 1\n"
+                       + "ewig 3\n" + "brust 2\n" + "sonne 1\n" + "sprechen 
1\n" + "ein 3\n" + "strasse 1\n" + "von 8\n"
+                       + "ueberlassen 1\n" + "dir 4\n" + "vom 3\n" + "zu 11\n" 
+ "schwebt 1\n" + "die 22\n" + "vor 2\n"
+                       + "wangen 1\n" + "wettgesang 1\n" + "donnerschlags 1\n" 
+ "find 1\n" + "dich 3\n" + "umfass 1\n"
+                       + "verboten 1\n" + "laeg 1\n" + "nie 1\n" + "drei 2\n" 
+ "dauern 1\n" + "toren 1\n" + "dauert 1\n"
+                       + "verheeren 1\n" + "fliegend 1\n" + "aus 1\n" + "staub 
1\n" + "fluessen 1\n" + "haus 1\n" + "auf 5\n"
+                       + "dient 2\n" + "tiefer 1\n" + "naeh 1\n" + "zieren 
1\n";
+
+       public static final String STREAMING_COUNTS_AS_TUPLES = "(machen,1)\n" 
+ "(zeit,1)\n" + "(zeit,2)\n" + "(heerscharen,1)\n" + "(keiner,1)\n"
+                       + "(keiner,2)\n" + "(meine,1)\n" + "(meine,2)\n" + 
"(meine,3)\n" + "(fuehr,1)\n" + "(triumph,1)\n" + "(kommst,1)\n"
+                       + "(frei,1)\n" + "(schaffen,1)\n" + "(gesinde,1)\n" + 
"(langbeinigen,1)\n" + "(schalk,1)\n" + "(besser,1)\n" + "(solang,1)\n"
+                       + "(meer,1)\n" + "(meer,2)\n" + "(meer,3)\n" + 
"(meer,4)\n" + "(fragst,1)\n" + "(gabriel,1)\n" + "(selbst,1)\n" + 
"(selbst,2)\n"
+                       + "(bin,1)\n" + "(sich,1)\n" + "(sich,2)\n" + 
"(sich,3)\n" + "(sich,4)\n" + "(sich,5)\n" + "(sich,6)\n" + "(sich,7)\n" + 
"(du,1)\n"
+                       + "(du,2)\n" + "(du,3)\n" + "(du,4)\n" + "(du,5)\n" + 
"(du,6)\n" + "(du,7)\n" + "(du,8)\n" + "(du,9)\n" + "(du,10)\n" + "(du,11)\n"
+                       + "(sogar,1)\n" + "(geht,1)\n" + "(immer,1)\n" + 
"(immer,2)\n" + "(immer,3)\n" + "(immer,4)\n" + "(mensch,1)\n" + "(mensch,2)\n"
+                       + "(befestigt,1)\n" + "(lebt,1)\n" + "(lebt,2)\n" + 
"(mag,1)\n" + "(mag,2)\n" + "(mag,3)\n" + "(engeln,1)\n" + "(engeln,2)\n"
+                       + "(breiten,1)\n" + "(blitzendes,1)\n" + "(tags,1)\n" + 
"(sie,1)\n" + "(sie,2)\n" + "(plagen,1)\n" + "(plagen,2)\n" + "(allzu,1)\n"
+                       + "(meisten,1)\n" + "(o,1)\n" + "(pfade,1)\n" + 
"(kennst,1)\n" + "(nichts,1)\n" + "(nichts,2)\n" + "(nichts,3)\n" + 
"(gedanken,1)\n"
+                       + "(befriedigt,1)\n" + "(mich,1)\n" + "(mich,2)\n" + 
"(mich,3)\n" + "(mich,4)\n" + "(mich,5)\n" + "(mich,6)\n" + "(s,1)\n" + 
"(s,2)\n"
+                       + "(s,3)\n" + "(es,1)\n" + "(es,2)\n" + "(es,3)\n" + 
"(es,4)\n" + "(es,5)\n" + "(es,6)\n" + "(es,7)\n" + "(es,8)\n" + 
"(verneinen,1)\n"
+                       + "(er,1)\n" + "(er,2)\n" + "(er,3)\n" + "(er,4)\n" + 
"(er,5)\n" + "(er,6)\n" + "(er,7)\n" + "(er,8)\n" + "(er,9)\n" + "(er,10)\n"
+                       + "(er,11)\n" + "(er,12)\n" + "(er,13)\n" + 
"(gleich,1)\n" + "(baeumchen,1)\n" + "(donnergang,1)\n" + "(wunderlich,1)\n"
+                       + "(reise,1)\n" + "(urquell,1)\n" + "(doch,1)\n" + 
"(doch,2)\n" + "(doch,3)\n" + "(aufs,1)\n" + "(aufs,2)\n" + "(toten,1)\n"
+                       + "(niemals,1)\n" + "(eine,1)\n" + "(eine,2)\n" + 
"(hab,1)\n" + "(darfst,1)\n" + "(da,1)\n" + "(da,2)\n" + "(da,3)\n" + "(da,4)\n"
+                       + "(da,5)\n" + "(gen,1)\n" + "(einem,1)\n" + 
"(einem,2)\n" + "(teil,1)\n" + "(das,1)\n" + "(das,2)\n" + "(das,3)\n" + 
"(das,4)\n"
+                       + "(das,5)\n" + "(das,6)\n" + "(das,7)\n" + 
"(speise,1)\n" + "(wenig,1)\n" + "(sterne,1)\n" + "(geb,1)\n" + "(welten,1)\n"
+                       + "(alle,1)\n" + "(alle,2)\n" + "(alle,3)\n" + 
"(toent,1)\n" + "(gras,1)\n" + "(felsen,1)\n" + "(kette,1)\n" + "(ich,1)\n"
+                       + "(ich,2)\n" + "(ich,3)\n" + "(ich,4)\n" + "(ich,5)\n" 
+ "(ich,6)\n" + "(ich,7)\n" + "(ich,8)\n" + "(ich,9)\n" + "(ich,10)\n"
+                       + "(ich,11)\n" + "(ich,12)\n" + "(ich,13)\n" + 
"(ich,14)\n" + "(fuer,1)\n" + "(fuer,2)\n" + "(als,1)\n" + "(als,2)\n" + 
"(als,3)\n"
+                       + "(mein,1)\n" + "(schoene,1)\n" + "(verzeih,1)\n" + 
"(schwankender,1)\n" + "(wie,1)\n" + "(wie,2)\n" + "(wie,3)\n" + "(wie,4)\n"
+                       + "(wie,5)\n" + "(wie,6)\n" + "(wie,7)\n" + "(wie,8)\n" 
+ "(wie,9)\n" + "(menschlich,1)\n" + "(gaertner,1)\n" + "(taetigkeit,1)\n"
+                       + "(bange,1)\n" + "(liebe,1)\n" + "(sei,1)\n" + 
"(sei,2)\n" + "(seh,1)\n" + "(tollheit,1)\n" + "(am,1)\n" + "(am,2)\n" + 
"(am,3)\n"
+                       + "(am,4)\n" + "(am,5)\n" + "(am,6)\n" + 
"(michael,1)\n" + "(geist,1)\n" + "(ab,1)\n" + "(nahst,1)\n" + "(vollendet,1)\n"
+                       + "(liebt,1)\n" + "(brausen,1)\n" + "(nase,1)\n" + 
"(erlaubt,1)\n" + "(weiss,1)\n" + "(weiss,2)\n" + "(schnellem,1)\n"
+                       + "(deinem,1)\n" + "(gleichem,1)\n" + "(gaerung,1)\n" + 
"(dauernden,1)\n" + "(deines,1)\n" + "(vorgeschriebne,1)\n"
+                       + "(irdisch,1)\n" + "(worte,1)\n" + "(verehren,1)\n" + 
"(hohen,1)\n" + "(hohen,2)\n" + "(weise,1)\n" + "(weise,2)\n"
+                       + "(kuenft,1)\n" + "(werdende,1)\n" + "(wette,1)\n" + 
"(wette,2)\n" + "(wuetend,1)\n" + "(erscheinung,1)\n" + "(gar,1)\n"
+                       + "(gar,2)\n" + "(verlieren,1)\n" + "(braucht,1)\n" + 
"(weiter,1)\n" + "(trank,1)\n" + "(tierischer,1)\n" + "(wohl,1)\n"
+                       + "(verteilen,1)\n" + "(verhoehnt,1)\n" + 
"(schaeumt,1)\n" + "(himmelslichts,1)\n" + "(unbedingte,1)\n" + "(herzlich,1)\n"
+                       + "(anblick,1)\n" + "(anblick,2)\n" + "(nennt,1)\n" + 
"(gruent,1)\n" + "(bluet,1)\n" + "(leichnam,1)\n" + "(erschlaffen,1)\n"
+                       + "(jammertagen,1)\n" + "(zieh,1)\n" + "(ihm,1)\n" + 
"(ihm,2)\n" + "(ihm,3)\n" + "(besondre,1)\n" + "(ihn,1)\n" + "(ihn,2)\n"
+                       + "(ihn,3)\n" + "(ihn,4)\n" + "(ihn,5)\n" + 
"(grossen,1)\n" + "(vollen,1)\n" + "(ihr,1)\n" + "(ihr,2)\n" + "(ihr,3)\n"
+                       + "(ihr,4)\n" + "(ihr,5)\n" + "(ihr,6)\n" + "(ihr,7)\n" 
+ "(boten,1)\n" + "(voller,1)\n" + "(singt,1)\n" + "(muhme,1)\n"
+                       + "(schon,1)\n" + "(last,1)\n" + "(kleine,1)\n" + 
"(paradieseshelle,1)\n" + "(nein,1)\n" + "(echten,1)\n" + "(unter,1)\n"
+                       + "(bei,1)\n" + "(herr,1)\n" + "(herr,2)\n" + 
"(herr,3)\n" + "(herr,4)\n" + "(herr,5)\n" + "(herr,6)\n" + "(herr,7)\n"
+                       + "(herr,8)\n" + "(herr,9)\n" + "(herr,10)\n" + 
"(herr,11)\n" + "(gern,1)\n" + "(gern,2)\n" + "(gern,3)\n" + 
"(sphaerenlauf,1)\n"
+                       + "(stets,1)\n" + "(ganze,1)\n" + "(braechte,1)\n" + 
"(fordert,1)\n" + "(schoensten,1)\n" + "(herrlich,1)\n" + "(herrlich,2)\n"
+                       + "(gegeben,1)\n" + "(allein,1)\n" + "(allein,2)\n" + 
"(reichen,1)\n" + "(schauervoller,1)\n" + "(musst,1)\n" + "(recht,1)\n"
+                       + "(bleibt,1)\n" + "(pracht,1)\n" + "(treibt,1)\n" + 
"(befangen,1)\n" + "(was,1)\n" + "(was,2)\n" + "(menschen,1)\n"
+                       + "(menschen,2)\n" + "(menschen,3)\n" + "(jede,1)\n" + 
"(hohe,1)\n" + "(tiefsten,1)\n" + "(bilden,1)\n" + "(drum,1)\n"
+                       + "(gibt,1)\n" + "(gibt,2)\n" + "(guter,1)\n" + 
"(fuerwahr,1)\n" + "(im,1)\n" + "(im,2)\n" + "(im,3)\n" + "(grund,1)\n"
+                       + "(in,1)\n" + "(in,2)\n" + "(in,3)\n" + "(in,4)\n" + 
"(in,5)\n" + "(in,6)\n" + "(in,7)\n" + "(in,8)\n" + "(in,9)\n" + 
"(hoechste,1)\n"
+                       + "(schliesst,1)\n" + "(fels,1)\n" + "(steh,1)\n" + 
"(euer,1)\n" + "(erster,1)\n" + "(ersten,1)\n" + "(ersten,2)\n" + "(ersten,3)\n"
+                       + "(goettersoehne,1)\n" + "(brechen,1)\n" + 
"(tiefen,1)\n" + "(frucht,1)\n" + "(kreis,1)\n" + "(siehst,1)\n" + "(wege,1)\n"
+                       + "(ist,1)\n" + "(ist,2)\n" + "(ist,3)\n" + "(ist,4)\n" 
+ "(ist,5)\n" + "(ist,6)\n" + "(ist,7)\n" + "(ist,8)\n" + "(zikaden,1)\n"
+                       + "(frischen,1)\n" + "(ruh,1)\n" + "(deine,1)\n" + 
"(deine,2)\n" + "(maus,1)\n" + "(brudersphaeren,1)\n" + "(nachher,1)\n"
+                       + "(euch,1)\n" + "(euch,2)\n" + "(euch,3)\n" + 
"(euch,4)\n" + "(gnaden,1)\n" + "(anzuklagen,1)\n" + "(schlange,1)\n" + 
"(staerke,1)\n"
+                       + "(staerke,2)\n" + "(erde,1)\n" + "(erde,2)\n" + 
"(erde,3)\n" + "(erde,4)\n" + "(verlaub,1)\n" + "(sanfte,1)\n" + "(holden,1)\n"
+                       + "(sonst,1)\n" + "(treten,1)\n" + "(sahst,1)\n" + 
"(alten,1)\n" + "(um,1)\n" + "(wieder,1)\n" + "(alter,1)\n" + "(altes,1)\n"
+                       + "(nun,1)\n" + "(lieb,1)\n" + "(gesellen,1)\n" + 
"(erscheinen,1)\n" + "(wirkt,1)\n" + "(wirkt,2)\n" + "(haettst,1)\n" + 
"(haettst,2)\n"
+                       + "(nur,1)\n" + "(nur,2)\n" + "(nur,3)\n" + "(nur,4)\n" 
+ "(nur,5)\n" + "(nur,6)\n" + "(nur,7)\n" + "(tiefbewegte,1)\n" + "(lachen,1)\n"
+                       + "(lachen,2)\n" + "(drange,1)\n" + "(schlag,1)\n" + 
"(schein,1)\n" + "(muss,1)\n" + "(verworren,1)\n" + "(weges,1)\n" + 
"(allen,1)\n"
+                       + "(gewoehnlich,1)\n" + "(alles,1)\n" + "(halb,1)\n" + 
"(stuerme,1)\n" + "(springt,1)\n" + "(sollt,1)\n" + "(klarheit,1)\n"
+                       + "(so,1)\n" + "(so,2)\n" + "(so,3)\n" + "(so,4)\n" + 
"(so,5)\n" + "(so,6)\n" + "(erfassen,1)\n" + "(liedchen,1)\n" + "(prolog,1)\n"
+                       + "(zur,1)\n" + "(fressen,1)\n" + "(zum,1)\n" + 
"(faust,1)\n" + "(faust,2)\n" + "(erzengel,1)\n" + "(erzengel,2)\n" + 
"(jahre,1)\n"
+                       + "(sonn,1)\n" + "(raphael,1)\n" + "(land,1)\n" + 
"(land,2)\n" + "(lang,1)\n" + "(gelange,1)\n" + "(lust,1)\n" + "(lust,2)\n"
+                       + "(welt,1)\n" + "(sehe,1)\n" + "(ihre,1)\n" + 
"(jedes,1)\n" + "(erfreut,1)\n" + "(seiner,1)\n" + "(denn,1)\n" + 
"(wandeln,1)\n"
+                       + "(wechselt,1)\n" + "(jeden,1)\n" + "(dort,1)\n" + 
"(schlecht,1)\n" + "(wenigsten,1)\n" + "(wuerd,1)\n" + "(schranken,1)\n"
+                       + "(bewusst,1)\n" + "(bewusst,2)\n" + "(seinem,1)\n" + 
"(seinem,2)\n" + "(gehasst,1)\n" + "(sein,1)\n" + "(meinem,1)\n"
+                       + "(meinen,1)\n" + "(pathos,1)\n" + "(herrn,1)\n" + 
"(lange,1)\n" + "(lange,2)\n" + "(herab,1)\n" + "(diesen,1)\n" + "(ihren,1)\n"
+                       + "(beruehmte,1)\n" + "(goethe,1)\n" + "(tag,1)\n" + 
"(tag,2)\n" + "(tag,3)\n" + "(tier,1)\n" + "(quark,1)\n" + "(dank,1)\n"
+                       + "(seine,1)\n" + "(teufel,1)\n" + "(teufel,2)\n" + 
"(zweck,1)\n" + "(wenn,1)\n" + "(wenn,2)\n" + "(wenn,3)\n" + "(wenn,4)\n"
+                       + "(wenn,5)\n" + "(wenn,6)\n" + "(wenn,7)\n" + 
"(soll,1)\n" + "(wirkung,1)\n" + "(erlaubnis,1)\n" + "(lebendig,1)\n" + 
"(uns,1)\n"
+                       + "(leicht,1)\n" + "(gewiss,1)\n" + "(schnell,1)\n" + 
"(und,1)\n" + "(und,2)\n" + "(und,3)\n" + "(und,4)\n" + "(und,5)\n" + 
"(und,6)\n"
+                       + "(und,7)\n" + "(und,8)\n" + "(und,9)\n" + 
"(und,10)\n" + "(und,11)\n" + "(und,12)\n" + "(und,13)\n" + "(und,14)\n" + 
"(und,15)\n"
+                       + "(und,16)\n" + "(und,17)\n" + "(und,18)\n" + 
"(und,19)\n" + "(und,20)\n" + "(und,21)\n" + "(und,22)\n" + "(und,23)\n" + 
"(und,24)\n"
+                       + "(und,25)\n" + "(und,26)\n" + "(und,27)\n" + 
"(und,28)\n" + "(und,29)\n" + "(gerne,1)\n" + "(rechten,1)\n" + "(umher,1)\n" + 
"(umher,2)\n"
+                       + "(vernunft,1)\n" + "(grase,1)\n" + "(nach,1)\n" + 
"(leben,1)\n" + "(gott,1)\n" + "(der,1)\n" + "(der,2)\n" + "(der,3)\n" + 
"(der,4)\n"
+                       + "(der,5)\n" + "(der,6)\n" + "(der,7)\n" + "(der,8)\n" 
+ "(der,9)\n" + "(der,10)\n" + "(der,11)\n" + "(der,12)\n" + "(der,13)\n"
+                       + "(der,14)\n" + "(der,15)\n" + "(der,16)\n" + 
"(der,17)\n" + "(der,18)\n" + "(der,19)\n" + "(der,20)\n" + "(der,21)\n" + 
"(der,22)\n"
+                       + "(der,23)\n" + "(der,24)\n" + "(der,25)\n" + 
"(der,26)\n" + "(der,27)\n" + "(der,28)\n" + "(der,29)\n" + "(des,1)\n" + 
"(des,2)\n"
+                       + "(des,3)\n" + "(des,4)\n" + "(des,5)\n" + 
"(doktor,1)\n" + "(beschaemt,1)\n" + "(dreht,1)\n" + "(habe,1)\n" + 
"(sagen,1)\n" + "(sagen,2)\n"
+                       + "(bekennen,1)\n" + "(dunklen,1)\n" + "(wettet,1)\n" + 
"(den,1)\n" + "(den,2)\n" + "(den,3)\n" + "(den,4)\n" + "(den,5)\n" + 
"(den,6)\n"
+                       + "(den,7)\n" + "(den,8)\n" + "(den,9)\n" + 
"(mephistopheles,1)\n" + "(mephistopheles,2)\n" + "(mephistopheles,3)\n"
+                       + "(mephistopheles,4)\n" + "(mephistopheles,5)\n" + 
"(mephistopheles,6)\n" + "(mephistopheles,7)\n" + "(mephistopheles,8)\n"
+                       + "(mephistopheles,9)\n" + "(dem,1)\n" + "(dem,2)\n" + 
"(dem,3)\n" + "(dem,4)\n" + "(auch,1)\n" + "(auch,2)\n" + "(auch,3)\n" + 
"(auch,4)\n"
+                       + "(kann,1)\n" + "(kann,2)\n" + "(armen,1)\n" + 
"(mir,1)\n" + "(mir,2)\n" + "(mir,3)\n" + "(mir,4)\n" + "(mir,5)\n" + 
"(mir,6)\n" + "(mir,7)\n"
+                       + "(mir,8)\n" + "(mir,9)\n" + "(strebt,1)\n" + 
"(gut,1)\n" + "(gut,2)\n" + "(mit,1)\n" + "(mit,2)\n" + "(mit,3)\n" + 
"(mit,4)\n" + "(mit,5)\n"
+                       + "(mit,6)\n" + "(mit,7)\n" + "(mit,8)\n" + "(mit,9)\n" 
+ "(mit,10)\n" + "(mit,11)\n" + "(bald,1)\n" + "(bald,2)\n" + 
"(himmlischen,1)\n"
+                       + "(himmel,1)\n" + "(himmel,2)\n" + "(himmel,3)\n" + 
"(noch,1)\n" + "(noch,2)\n" + "(noch,3)\n" + "(kannst,1)\n" + 
"(deinesgleichen,1)\n"
+                       + "(flammt,1)\n" + "(ergruenden,1)\n" + 
"(ergruenden,2)\n" + "(nacht,1)\n" + "(scheint,1)\n" + "(ferne,1)\n" + 
"(ferne,2)\n"
+                       + "(tragoedie,1)\n" + "(abgewoehnt,1)\n" + 
"(reizt,1)\n" + "(geistern,1)\n" + "(nicht,1)\n" + "(nicht,2)\n" + 
"(nicht,3)\n" + "(nicht,4)\n"
+                       + "(nicht,5)\n" + "(nicht,6)\n" + "(nicht,7)\n" + 
"(nicht,8)\n" + "(nicht,9)\n" + "(nicht,10)\n" + "(sacht,1)\n" + 
"(unbegreiflich,1)\n"
+                       + "(unbegreiflich,2)\n" + "(schnelle,1)\n" + 
"(einmal,1)\n" + "(werd,1)\n" + "(werke,1)\n" + "(werke,2)\n" + "(begraebt,1)\n"
+                       + "(knecht,1)\n" + "(rings,1)\n" + "(wird,1)\n" + 
"(katze,1)\n" + "(huete,1)\n" + "(fortgerissen,1)\n" + "(gebt,1)\n" + 
"(huebsch,1)\n"
+                       + "(hast,1)\n" + "(irrt,1)\n" + "(befinde,1)\n" + 
"(sind,1)\n" + "(sind,2)\n" + "(fuehren,1)\n" + "(fuehren,2)\n" + "(fliegt,1)\n"
+                       + "(ewig,1)\n" + "(ewig,2)\n" + "(ewig,3)\n" + 
"(brust,1)\n" + "(brust,2)\n" + "(sonne,1)\n" + "(sprechen,1)\n" + "(ein,1)\n" 
+ "(ein,2)\n"
+                       + "(ein,3)\n" + "(strasse,1)\n" + "(von,1)\n" + 
"(von,2)\n" + "(von,3)\n" + "(von,4)\n" + "(von,5)\n" + "(von,6)\n" + 
"(von,7)\n" + "(von,8)\n"
+                       + "(ueberlassen,1)\n" + "(dir,1)\n" + "(dir,2)\n" + 
"(dir,3)\n" + "(dir,4)\n" + "(vom,1)\n" + "(vom,2)\n" + "(vom,3)\n" + 
"(zu,1)\n" + "(zu,2)\n"
+                       + "(zu,3)\n" + "(zu,4)\n" + "(zu,5)\n" + "(zu,6)\n" + 
"(zu,7)\n" + "(zu,8)\n" + "(zu,9)\n" + "(zu,10)\n" + "(zu,11)\n" + 
"(schwebt,1)\n"
+                       + "(die,1)\n" + "(die,2)\n" + "(die,3)\n" + "(die,4)\n" 
+ "(die,5)\n" + "(die,6)\n" + "(die,7)\n" + "(die,8)\n" + "(die,9)\n" + 
"(die,10)\n"
+                       + "(die,11)\n" + "(die,12)\n" + "(die,13)\n" + 
"(die,14)\n" + "(die,15)\n" + "(die,16)\n" + "(die,17)\n" + "(die,18)\n" + 
"(die,19)\n"
+                       + "(die,20)\n" + "(die,21)\n" + "(die,22)\n" + 
"(vor,1)\n" + "(vor,2)\n" + "(wangen,1)\n" + "(wettgesang,1)\n" + 
"(donnerschlags,1)\n"
+                       + "(find,1)\n" + "(dich,1)\n" + "(dich,2)\n" + 
"(dich,3)\n" + "(umfass,1)\n" + "(verboten,1)\n" + "(laeg,1)\n" + "(nie,1)\n" + 
"(drei,1)\n"
+                       + "(drei,2)\n" + "(dauern,1)\n" + "(toren,1)\n" + 
"(dauert,1)\n" + "(verheeren,1)\n" + "(fliegend,1)\n" + "(aus,1)\n" + 
"(staub,1)\n"
+                       + "(fluessen,1)\n" + "(haus,1)\n" + "(auf,1)\n" + 
"(auf,2)\n" + "(auf,3)\n" + "(auf,4)\n" + "(auf,5)\n" + "(dient,1)\n" + 
"(dient,2)\n"
+                       + "(tiefer,1)\n" + "(naeh,1)\n" + "(zieren,1)\n";
+
+       public static final String COUNTS_AS_TUPLES = "(machen,1)\n" + 
"(zeit,2)\n" + "(heerscharen,1)\n" + "(keiner,2)\n" + "(meine,3)\n"
+                       + "(fuehr,1)\n" + "(triumph,1)\n" + "(kommst,1)\n" + 
"(frei,1)\n" + "(schaffen,1)\n" + "(gesinde,1)\n"
+                       + "(langbeinigen,1)\n" + "(schalk,1)\n" + 
"(besser,1)\n" + "(solang,1)\n" + "(meer,4)\n" + "(fragst,1)\n"
+                       + "(gabriel,1)\n" + "(selbst,2)\n" + "(bin,1)\n" + 
"(sich,7)\n" + "(du,11)\n" + "(sogar,1)\n" + "(geht,1)\n"
+                       + "(immer,4)\n" + "(mensch,2)\n" + "(befestigt,1)\n" + 
"(lebt,2)\n" + "(mag,3)\n" + "(engeln,2)\n" + "(breiten,1)\n"
+                       + "(blitzendes,1)\n" + "(tags,1)\n" + "(sie,2)\n" + 
"(plagen,2)\n" + "(allzu,1)\n" + "(meisten,1)\n" + "(o,1)\n"
+                       + "(pfade,1)\n" + "(kennst,1)\n" + "(nichts,3)\n" + 
"(gedanken,1)\n" + "(befriedigt,1)\n" + "(mich,6)\n" + "(s,3)\n"
+                       + "(es,8)\n" + "(verneinen,1)\n" + "(er,13)\n" + 
"(gleich,1)\n" + "(baeumchen,1)\n" + "(donnergang,1)\n"
+                       + "(wunderlich,1)\n" + "(reise,1)\n" + "(urquell,1)\n" 
+ "(doch,3)\n" + "(aufs,2)\n" + "(toten,1)\n" + "(niemals,1)\n"
+                       + "(eine,2)\n" + "(hab,1)\n" + "(darfst,1)\n" + 
"(da,5)\n" + "(gen,1)\n" + "(einem,2)\n" + "(teil,1)\n" + "(das,7)\n"
+                       + "(speise,1)\n" + "(wenig,1)\n" + "(sterne,1)\n" + 
"(geb,1)\n" + "(welten,1)\n" + "(alle,3)\n" + "(toent,1)\n"
+                       + "(gras,1)\n" + "(felsen,1)\n" + "(kette,1)\n" + 
"(ich,14)\n" + "(fuer,2)\n" + "(als,3)\n" + "(mein,1)\n"
+                       + "(schoene,1)\n" + "(verzeih,1)\n" + 
"(schwankender,1)\n" + "(wie,9)\n" + "(menschlich,1)\n" + "(gaertner,1)\n"
+                       + "(taetigkeit,1)\n" + "(bange,1)\n" + "(liebe,1)\n" + 
"(sei,2)\n" + "(seh,1)\n" + "(tollheit,1)\n" + "(am,6)\n"
+                       + "(michael,1)\n" + "(geist,1)\n" + "(ab,1)\n" + 
"(nahst,1)\n" + "(vollendet,1)\n" + "(liebt,1)\n" + "(brausen,1)\n"
+                       + "(nase,1)\n" + "(erlaubt,1)\n" + "(weiss,2)\n" + 
"(schnellem,1)\n" + "(deinem,1)\n" + "(gleichem,1)\n"
+                       + "(gaerung,1)\n" + "(dauernden,1)\n" + "(deines,1)\n" 
+ "(vorgeschriebne,1)\n" + "(irdisch,1)\n" + "(worte,1)\n"
+                       + "(verehren,1)\n" + "(hohen,2)\n" + "(weise,2)\n" + 
"(kuenft,1)\n" + "(werdende,1)\n" + "(wette,2)\n" + "(wuetend,1)\n"
+                       + "(erscheinung,1)\n" + "(gar,2)\n" + "(verlieren,1)\n" 
+ "(braucht,1)\n" + "(weiter,1)\n" + "(trank,1)\n"
+                       + "(tierischer,1)\n" + "(wohl,1)\n" + "(verteilen,1)\n" 
+ "(verhoehnt,1)\n" + "(schaeumt,1)\n" + "(himmelslichts,1)\n"
+                       + "(unbedingte,1)\n" + "(herzlich,1)\n" + 
"(anblick,2)\n" + "(nennt,1)\n" + "(gruent,1)\n" + "(bluet,1)\n"
+                       + "(leichnam,1)\n" + "(erschlaffen,1)\n" + 
"(jammertagen,1)\n" + "(zieh,1)\n" + "(ihm,3)\n" + "(besondre,1)\n"
+                       + "(ihn,5)\n" + "(grossen,1)\n" + "(vollen,1)\n" + 
"(ihr,7)\n" + "(boten,1)\n" + "(voller,1)\n" + "(singt,1)\n"
+                       + "(muhme,1)\n" + "(schon,1)\n" + "(last,1)\n" + 
"(kleine,1)\n" + "(paradieseshelle,1)\n" + "(nein,1)\n" + "(echten,1)\n"
+                       + "(unter,1)\n" + "(bei,1)\n" + "(herr,11)\n" + 
"(gern,3)\n" + "(sphaerenlauf,1)\n" + "(stets,1)\n" + "(ganze,1)\n"
+                       + "(braechte,1)\n" + "(fordert,1)\n" + 
"(schoensten,1)\n" + "(herrlich,2)\n" + "(gegeben,1)\n" + "(allein,2)\n"
+                       + "(reichen,1)\n" + "(schauervoller,1)\n" + 
"(musst,1)\n" + "(recht,1)\n" + "(bleibt,1)\n" + "(pracht,1)\n"
+                       + "(treibt,1)\n" + "(befangen,1)\n" + "(was,2)\n" + 
"(menschen,3)\n" + "(jede,1)\n" + "(hohe,1)\n" + "(tiefsten,1)\n"
+                       + "(bilden,1)\n" + "(drum,1)\n" + "(gibt,2)\n" + 
"(guter,1)\n" + "(fuerwahr,1)\n" + "(im,3)\n" + "(grund,1)\n" + "(in,9)\n"
+                       + "(hoechste,1)\n" + "(schliesst,1)\n" + "(fels,1)\n" + 
"(steh,1)\n" + "(euer,1)\n" + "(erster,1)\n" + "(ersten,3)\n"
+                       + "(goettersoehne,1)\n" + "(brechen,1)\n" + 
"(tiefen,1)\n" + "(frucht,1)\n" + "(kreis,1)\n" + "(siehst,1)\n"
+                       + "(wege,1)\n" + "(ist,8)\n" + "(zikaden,1)\n" + 
"(frischen,1)\n" + "(ruh,1)\n" + "(deine,2)\n" + "(maus,1)\n"
+                       + "(brudersphaeren,1)\n" + "(nachher,1)\n" + 
"(euch,4)\n" + "(gnaden,1)\n" + "(anzuklagen,1)\n" + "(schlange,1)\n"
+                       + "(staerke,2)\n" + "(erde,4)\n" + "(verlaub,1)\n" + 
"(sanfte,1)\n" + "(holden,1)\n" + "(sonst,1)\n" + "(treten,1)\n"
+                       + "(sahst,1)\n" + "(alten,1)\n" + "(um,1)\n" + 
"(wieder,1)\n" + "(alter,1)\n" + "(altes,1)\n" + "(nun,1)\n" + "(lieb,1)\n"
+                       + "(gesellen,1)\n" + "(erscheinen,1)\n" + "(wirkt,2)\n" 
+ "(haettst,2)\n" + "(nur,7)\n" + "(tiefbewegte,1)\n"
+                       + "(lachen,2)\n" + "(drange,1)\n" + "(schlag,1)\n" + 
"(schein,1)\n" + "(muss,1)\n" + "(verworren,1)\n" + "(weges,1)\n"
+                       + "(allen,1)\n" + "(gewoehnlich,1)\n" + "(alles,1)\n" + 
"(halb,1)\n" + "(stuerme,1)\n" + "(springt,1)\n" + "(sollt,1)\n"
+                       + "(klarheit,1)\n" + "(so,6)\n" + "(erfassen,1)\n" + 
"(liedchen,1)\n" + "(prolog,1)\n" + "(zur,1)\n" + "(fressen,1)\n"
+                       + "(zum,1)\n" + "(faust,2)\n" + "(erzengel,2)\n" + 
"(jahre,1)\n" + "(sonn,1)\n" + "(raphael,1)\n" + "(land,2)\n"
+                       + "(lang,1)\n" + "(gelange,1)\n" + "(lust,2)\n" + 
"(welt,1)\n" + "(sehe,1)\n" + "(ihre,1)\n" + "(jedes,1)\n"
+                       + "(erfreut,1)\n" + "(seiner,1)\n" + "(denn,1)\n" + 
"(wandeln,1)\n" + "(wechselt,1)\n" + "(jeden,1)\n" + "(dort,1)\n"
+                       + "(schlecht,1)\n" + "(wenigsten,1)\n" + "(wuerd,1)\n" 
+ "(schranken,1)\n" + "(bewusst,2)\n" + "(seinem,2)\n"
+                       + "(gehasst,1)\n" + "(sein,1)\n" + "(meinem,1)\n" + 
"(meinen,1)\n" + "(pathos,1)\n" + "(herrn,1)\n" + "(lange,2)\n"
+                       + "(herab,1)\n" + "(diesen,1)\n" + "(ihren,1)\n" + 
"(beruehmte,1)\n" + "(goethe,1)\n" + "(tag,3)\n" + "(tier,1)\n"
+                       + "(quark,1)\n" + "(dank,1)\n" + "(seine,1)\n" + 
"(teufel,2)\n" + "(zweck,1)\n" + "(wenn,7)\n" + "(soll,1)\n"
+                       + "(wirkung,1)\n" + "(erlaubnis,1)\n" + 
"(lebendig,1)\n" + "(uns,1)\n" + "(leicht,1)\n" + "(gewiss,1)\n"
+                       + "(schnell,1)\n" + "(und,29)\n" + "(gerne,1)\n" + 
"(rechten,1)\n" + "(umher,2)\n" + "(vernunft,1)\n" + "(grase,1)\n"
+                       + "(nach,1)\n" + "(leben,1)\n" + "(gott,1)\n" + 
"(der,29)\n" + "(des,5)\n" + "(doktor,1)\n" + "(beschaemt,1)\n"
+                       + "(dreht,1)\n" + "(habe,1)\n" + "(sagen,2)\n" + 
"(bekennen,1)\n" + "(dunklen,1)\n" + "(wettet,1)\n" + "(den,9)\n"
+                       + "(mephistopheles,9)\n" + "(dem,4)\n" + "(auch,4)\n" + 
"(kann,2)\n" + "(armen,1)\n" + "(mir,9)\n" + "(strebt,1)\n"
+                       + "(gut,2)\n" + "(mit,11)\n" + "(bald,2)\n" + 
"(himmlischen,1)\n" + "(himmel,3)\n" + "(noch,3)\n" + "(kannst,1)\n"
+                       + "(deinesgleichen,1)\n" + "(flammt,1)\n" + 
"(ergruenden,2)\n" + "(nacht,1)\n" + "(scheint,1)\n" + "(ferne,2)\n"
+                       + "(tragoedie,1)\n" + "(abgewoehnt,1)\n" + 
"(reizt,1)\n" + "(geistern,1)\n" + "(nicht,10)\n" + "(sacht,1)\n"
+                       + "(unbegreiflich,2)\n" + "(schnelle,1)\n" + 
"(einmal,1)\n" + "(werd,1)\n" + "(werke,2)\n" + "(begraebt,1)\n"
+                       + "(knecht,1)\n" + "(rings,1)\n" + "(wird,1)\n" + 
"(katze,1)\n" + "(huete,1)\n" + "(fortgerissen,1)\n" + "(gebt,1)\n"
+                       + "(huebsch,1)\n" + "(hast,1)\n" + "(irrt,1)\n" + 
"(befinde,1)\n" + "(sind,2)\n" + "(fuehren,2)\n" + "(fliegt,1)\n"
+                       + "(ewig,3)\n" + "(brust,2)\n" + "(sonne,1)\n" + 
"(sprechen,1)\n" + "(ein,3)\n" + "(strasse,1)\n" + "(von,8)\n"
+                       + "(ueberlassen,1)\n" + "(dir,4)\n" + "(vom,3)\n" + 
"(zu,11)\n" + "(schwebt,1)\n" + "(die,22)\n" + "(vor,2)\n"
+                       + "(wangen,1)\n" + "(wettgesang,1)\n" + 
"(donnerschlags,1)\n" + "(find,1)\n" + "(dich,3)\n" + "(umfass,1)\n"
+                       + "(verboten,1)\n" + "(laeg,1)\n" + "(nie,1)\n" + 
"(drei,2)\n" + "(dauern,1)\n" + "(toren,1)\n" + "(dauert,1)\n"
+                       + "(verheeren,1)\n" + "(fliegend,1)\n" + "(aus,1)\n" + 
"(staub,1)\n" + "(fluessen,1)\n" + "(haus,1)\n" + "(auf,5)\n"
+                       + "(dient,2)\n" + "(tiefer,1)\n" + "(naeh,1)\n" + 
"(zieren,1)\n";
+
+       private WordCountData() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/AbstractTestBase.java
new file mode 100644
index 0000000..c2da691
--- /dev/null
+++ 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.configuration.Configuration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+/**
+ * A base class for tests that run test programs in a Flink mini cluster.
+ */
+public abstract class AbstractTestBase extends TestBaseUtils {
+       
+       /** Configuration to start the testing cluster with */
+       protected final Configuration config;
+       
+       private final List<File> tempFiles;
+       
+       private final FiniteDuration timeout;
+
+       protected int taskManagerNumSlots = 1;
+
+       protected int numTaskManagers = 1;
+       
+       /** The mini cluster that runs the test programs */
+       protected ForkableFlinkMiniCluster executor;
+       
+
+       public AbstractTestBase(Configuration config) {
+               this.config = Objects.requireNonNull(config);
+               this.tempFiles = new ArrayList<File>();
+
+               timeout = AkkaUtils.getTimeout(config);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Local Test Cluster Life Cycle
+       // 
--------------------------------------------------------------------------------------------
+
+       public void startCluster() throws Exception {
+               this.executor = startCluster(
+                       numTaskManagers,
+                       taskManagerNumSlots,
+                       false,
+                       false,
+                       true);
+       }
+
+       public void stopCluster() throws Exception {
+               stopCluster(executor, timeout);
+               deleteAllTempFiles();
+       }
+
+       //------------------
+       // Accessors
+       //------------------
+
+       public int getTaskManagerNumSlots() {
+               return taskManagerNumSlots;
+       }
+
+       public void setTaskManagerNumSlots(int taskManagerNumSlots) {
+               this.taskManagerNumSlots = taskManagerNumSlots;
+       }
+
+       public int getNumTaskManagers() {
+               return numTaskManagers;
+       }
+
+       public void setNumTaskManagers(int numTaskManagers) {
+               this.numTaskManagers = numTaskManagers;
+       }
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Temporary File Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       public String getTempDirPath(String dirName) throws IOException {
+               File f = createAndRegisterTempFile(dirName);
+               return f.toURI().toString();
+       }
+
+       public String getTempFilePath(String fileName) throws IOException {
+               File f = createAndRegisterTempFile(fileName);
+               return f.toURI().toString();
+       }
+
+       public String createTempFile(String fileName, String contents) throws 
IOException {
+               File f = createAndRegisterTempFile(fileName);
+               Files.write(contents, f, Charsets.UTF_8);
+               return f.toURI().toString();
+       }
+
+       public File createAndRegisterTempFile(String fileName) throws 
IOException {
+               File baseDir = new File(System.getProperty("java.io.tmpdir"));
+               File f = new File(baseDir, this.getClass().getName() + "-" + 
fileName);
+
+               if (f.exists()) {
+                       deleteRecursively(f);
+               }
+
+               File parentToDelete = f;
+               while (true) {
+                       File parent = parentToDelete.getParentFile();
+                       if (parent == null) {
+                               throw new IOException("Missed temp dir while 
traversing parents of a temp file.");
+                       }
+                       if (parent.equals(baseDir)) {
+                               break;
+                       }
+                       parentToDelete = parent;
+               }
+
+               Files.createParentDirs(f);
+               this.tempFiles.add(parentToDelete);
+               return f;
+       }
+
+       private void deleteAllTempFiles() throws IOException {
+               for (File f : this.tempFiles) {
+                       if (f.exists()) {
+                               deleteRecursively(f);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/java/org/apache/flink/test/util/CollectionTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/java/org/apache/flink/test/util/CollectionTestEnvironment.java
 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/CollectionTestEnvironment.java
new file mode 100644
index 0000000..e56c7e8
--- /dev/null
+++ 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/CollectionTestEnvironment.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+
+public class CollectionTestEnvironment extends CollectionEnvironment {
+
+       private CollectionTestEnvironment lastEnv = null;
+
+       @Override
+       public JobExecutionResult getLastJobExecutionResult() {
+               if (lastEnv == null) {
+                       return this.lastJobExecutionResult;
+               }
+               else {
+                       return lastEnv.getLastJobExecutionResult();
+               }
+       }
+
+       @Override
+       public JobExecutionResult execute() throws Exception {
+               return execute("test job");
+       }
+
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               JobExecutionResult result = super.execute(jobName);
+               this.lastJobExecutionResult = result;
+               return result;
+       }
+
+       protected void setAsContext() {
+               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
+                       @Override
+                       public ExecutionEnvironment 
createExecutionEnvironment() {
+                               lastEnv = new CollectionTestEnvironment();
+                               return lastEnv;
+                       }
+               };
+
+               initializeContextEnvironment(factory);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/java/org/apache/flink/test/util/JavaProgramTestBase.java
 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/JavaProgramTestBase.java
new file mode 100644
index 0000000..e639c80
--- /dev/null
+++ 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import java.util.Comparator;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.java.tuple.Tuple;
+
+
+public abstract class JavaProgramTestBase extends AbstractTestBase {
+
+       private static final int DEFAULT_PARALLELISM = 4;
+       
+       private JobExecutionResult latestExecutionResult;
+       
+       private int parallelism = DEFAULT_PARALLELISM;
+
+       /**
+        * The number of times a test should be repeated.
+        *
+        * <p> This is useful for runtime changes, which affect resource 
management. Running certain
+        * tests repeatedly might help to discover resource leaks, race 
conditions etc.
+        */
+       private int numberOfTestRepetitions = 1;
+
+       private boolean isCollectionExecution;
+
+       public JavaProgramTestBase() {
+               this(new Configuration());
+       }
+       
+       public JavaProgramTestBase(Configuration config) {
+               super(config);
+               setTaskManagerNumSlots(parallelism);
+       }
+       
+       public void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+               setTaskManagerNumSlots(parallelism);
+       }
+
+       public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+               this.numberOfTestRepetitions = numberOfTestRepetitions;
+       }
+       
+       public int getParallelism() {
+               return isCollectionExecution ? 1 : parallelism;
+       }
+       
+       public JobExecutionResult getLatestExecutionResult() {
+               return this.latestExecutionResult;
+       }
+       
+       public boolean isCollectionExecution() {
+               return isCollectionExecution;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Methods to create the test program and for pre- and post- test work
+       // 
--------------------------------------------------------------------------------------------
+
+       protected abstract void testProgram() throws Exception;
+
+       protected void preSubmit() throws Exception {}
+       
+       protected void postSubmit() throws Exception {}
+       
+       protected boolean skipCollectionExecution() {
+               return false;
+       };
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Test entry point
+       // 
--------------------------------------------------------------------------------------------
+
+       @Test
+       public void testJobWithObjectReuse() throws Exception {
+               isCollectionExecution = false;
+               
+               startCluster();
+               try {
+                       // pre-submit
+                       try {
+                               preSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               Assert.fail("Pre-submit work caused an error: " 
+ e.getMessage());
+                       }
+                       
+                       // prepare the test environment
+                       TestEnvironment env = new 
TestEnvironment(this.executor, this.parallelism);
+                       env.getConfig().enableObjectReuse();
+                       env.setAsContext();
+
+                       // Possibly run the test multiple times
+                       for (int i = 0; i < numberOfTestRepetitions; i++) {
+                               // call the test program
+                               try {
+                                       testProgram();
+                                       this.latestExecutionResult = 
env.getLastJobExecutionResult();
+                               }
+                               catch (Exception e) {
+                                       System.err.println(e.getMessage());
+                                       e.printStackTrace();
+                                       Assert.fail("Error while calling the 
test program: " + e.getMessage());
+                               }
+
+                               Assert.assertNotNull("The test program never 
triggered an execution.",
+                                               this.latestExecutionResult);
+                       }
+                       
+                       // post-submit
+                       try {
+                               postSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               Assert.fail("Post-submit work caused an error: 
" + e.getMessage());
+                       }
+               } finally {
+                       stopCluster();
+               }
+       }
+
+       @Test
+       public void testJobWithoutObjectReuse() throws Exception {
+               isCollectionExecution = false;
+
+               startCluster();
+               try {
+                       // pre-submit
+                       try {
+                               preSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               Assert.fail("Pre-submit work caused an error: " 
+ e.getMessage());
+                       }
+
+                       // prepare the test environment
+                       TestEnvironment env = new 
TestEnvironment(this.executor, this.parallelism);
+                       env.getConfig().disableObjectReuse();
+                       env.setAsContext();
+
+                       // Possibly run the test multiple times
+                       for (int i = 0; i < numberOfTestRepetitions; i++) {
+                               // call the test program
+                               try {
+                                       testProgram();
+                                       this.latestExecutionResult = 
env.getLastJobExecutionResult();
+                               }
+                               catch (Exception e) {
+                                       System.err.println(e.getMessage());
+                                       e.printStackTrace();
+                                       Assert.fail("Error while calling the 
test program: " + e.getMessage());
+                               }
+
+                               Assert.assertNotNull("The test program never 
triggered an execution.",
+                                               this.latestExecutionResult);
+                       }
+
+                       // post-submit
+                       try {
+                               postSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               Assert.fail("Post-submit work caused an error: 
" + e.getMessage());
+                       }
+               } finally {
+                       stopCluster();
+               }
+       }
+       
+       @Test
+       public void testJobCollectionExecution() throws Exception {
+               
+               // check if collection execution should be skipped.
+               if(this.skipCollectionExecution()) {
+                       return;
+               }
+               
+               isCollectionExecution = true;
+               
+               // pre-submit
+               try {
+                       preSubmit();
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       Assert.fail("Pre-submit work caused an error: " + 
e.getMessage());
+               }
+               
+               // prepare the test environment
+               CollectionTestEnvironment env = new CollectionTestEnvironment();
+               env.setAsContext();
+               
+               // call the test program
+               try {
+                       testProgram();
+                       this.latestExecutionResult = 
env.getLastJobExecutionResult();
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       Assert.fail("Error while calling the test program: " + 
e.getMessage());
+               }
+               
+               Assert.assertNotNull("The test program never triggered an 
execution.", this.latestExecutionResult);
+               
+               // post-submit
+               try {
+                       postSubmit();
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       Assert.fail("Post-submit work caused an error: " + 
e.getMessage());
+               }
+       }
+       
+       public static class TupleComparator<T extends Tuple> implements 
Comparator<T> {
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public int compare(T o1, T o2) {
+                       int a1 = o1.getArity();
+                       int a2 = o2.getArity();
+                       
+                       if (a1 < a2) {
+                               return -1;
+                       } else if (a2 < a1) {
+                               return 1;
+                       } else {
+                               for (int i = 0; i < a1; i++) {
+                                       Object obj1 = o1.getField(i);
+                                       Object obj2 = o2.getField(i);
+                                       
+                                       if (!(obj1 instanceof Comparable && 
obj2 instanceof Comparable)) {
+                                               Assert.fail("Cannot compare 
tuple fields");
+                                       }
+                                       
+                                       int cmp = ((Comparable<Object>) 
obj1).compareTo(obj2);
+                                       if (cmp != 0) {
+                                               return cmp;
+                                       }
+                               }
+                               
+                               return 0;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
new file mode 100644
index 0000000..d7f09bd
--- /dev/null
+++ 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Base class for unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup 
and
+ * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the ExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ *   {@literal @}Test
+ *   public void someTest() {
+ *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * }</pre>
+ */
+public class MultipleProgramsTestBase extends TestBaseUtils {
+
+       /**
+        * Enum that defines which execution environment to run the next test 
on:
+        * An embedded local flink cluster, or the collection execution backend.
+        */
+       public enum TestExecutionMode {
+               CLUSTER,
+               COLLECTION
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  The mini cluster that is shared across tests
+       // 
------------------------------------------------------------------------
+
+       protected static final int DEFAULT_PARALLELISM = 4;
+
+       protected static boolean startWebServer = false;
+
+       protected static ForkableFlinkMiniCluster cluster = null;
+       
+       // 
------------------------------------------------------------------------
+       
+       protected final TestExecutionMode mode;
+
+       
+       public MultipleProgramsTestBase(TestExecutionMode mode) {
+               this.mode = mode;
+               
+               switch(mode){
+                       case CLUSTER:
+                               TestEnvironment clusterEnv = new 
TestEnvironment(cluster, 4);
+                               clusterEnv.setAsContext();
+                               break;
+                       case COLLECTION:
+                               CollectionTestEnvironment collectionEnv = new 
CollectionTestEnvironment();
+                               collectionEnv.setAsContext();
+                               break;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Cluster setup & teardown
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               cluster = TestBaseUtils.startCluster(
+                       1,
+                       DEFAULT_PARALLELISM,
+                       startWebServer,
+                       false,
+                       true);
+       }
+
+       @AfterClass
+       public static void teardown() throws Exception {
+               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Parametrization lets the tests run in cluster and collection mode
+       // 
------------------------------------------------------------------------
+       
+       @Parameterized.Parameters(name = "Execution mode = {0}")
+       public static Collection<Object[]> executionModes() {
+               return Arrays.asList(
+                               new Object[] { TestExecutionMode.CLUSTER },
+                               new Object[] { TestExecutionMode.COLLECTION });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/java/org/apache/flink/test/util/TestBaseUtils.java 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/TestBaseUtils.java
new file mode 100644
index 0000000..4dda4cf
--- /dev/null
+++ 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TestBaseUtils extends TestLogger {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TestBaseUtils.class);
+
+       protected static final int MINIMUM_HEAP_SIZE_MB = 192;
+
+       protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
+
+       protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
+
+       protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
+
+       protected static FiniteDuration DEFAULT_TIMEOUT = new 
FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+
+       // 
------------------------------------------------------------------------
+       
+       protected static File logDir;
+
+       protected TestBaseUtils(){
+               verifyJvmOptions();
+       }
+       
+       private static void verifyJvmOptions() {
+               long heap = Runtime.getRuntime().maxMemory() >> 20;
+               Assert.assertTrue("Insufficient java heap space " + heap + "mb 
- set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+                               + "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
+       }
+       
+       
+       public static ForkableFlinkMiniCluster startCluster(
+               int numTaskManagers,
+               int taskManagerNumSlots,
+               boolean startWebserver,
+               boolean startZooKeeper,
+               boolean singleActorSystem) throws Exception {
+               
+               Configuration config = new Configuration();
+
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
taskManagerNumSlots);
+               
+               config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, 
startWebserver);
+
+               if (startZooKeeper) {
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+                       config.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               }
+
+               return startCluster(config, singleActorSystem);
+       }
+
+       public static ForkableFlinkMiniCluster startCluster(
+               Configuration config,
+               boolean singleActorSystem) throws Exception {
+
+               logDir = File.createTempFile("TestBaseUtils-logdir", null);
+               Assert.assertTrue("Unable to delete temp file", 
logDir.delete());
+               Assert.assertTrue("Unable to create temp directory", 
logDir.mkdir());
+               Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
+               Files.createFile(new File(logDir, "jobmanager.out").toPath());
+
+               config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
TASK_MANAGER_MEMORY_SIZE);
+               
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
+
+               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
+               config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
+
+               config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
8081);
+               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
+               
+               config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.toString());
+
+               ForkableFlinkMiniCluster cluster =  new 
ForkableFlinkMiniCluster(config, singleActorSystem);
+
+               cluster.start();
+
+               return cluster;
+       }
+
+
+       public static void stopCluster(ForkableFlinkMiniCluster executor, 
FiniteDuration timeout) throws Exception {
+               if (logDir != null) {
+                       FileUtils.deleteDirectory(logDir);
+               }
+               if (executor != null) {
+                       int numUnreleasedBCVars = 0;
+                       int numActiveConnections = 0;
+                       
+                       if (executor.running()) {
+                               List<ActorRef> tms = 
executor.getTaskManagersAsJava();
+                               List<Future<Object>> 
bcVariableManagerResponseFutures = new ArrayList<>();
+                               List<Future<Object>> 
numActiveConnectionsResponseFutures = new ArrayList<>();
+
+                               for (ActorRef tm : tms) {
+                                       
bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
+                                                       
.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
+
+                                       
numActiveConnectionsResponseFutures.add(Patterns.ask(tm, 
TestingTaskManagerMessages
+                                                       
.RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
+                               }
+
+                               Future<Iterable<Object>> 
bcVariableManagerFutureResponses = Futures.sequence(
+                                               
bcVariableManagerResponseFutures, TestingUtils.defaultExecutionContext());
+
+                               Iterable<Object> responses = 
Await.result(bcVariableManagerFutureResponses, timeout);
+
+                               for (Object response : responses) {
+                                       numUnreleasedBCVars += 
((TestingTaskManagerMessages
+                                                       
.ResponseBroadcastVariablesWithReferences) response).number();
+                               }
+
+                               Future<Iterable<Object>> 
numActiveConnectionsFutureResponses = Futures.sequence(
+                                               
numActiveConnectionsResponseFutures, TestingUtils.defaultExecutionContext());
+
+                               responses = 
Await.result(numActiveConnectionsFutureResponses, timeout);
+
+                               for (Object response : responses) {
+                                       numActiveConnections += 
((TestingTaskManagerMessages
+                                                       
.ResponseNumActiveConnections) response).number();
+                               }
+                       }
+
+                       executor.stop();
+                       FileSystem.closeAll();
+                       System.gc();
+
+                       Assert.assertEquals("Not all broadcast variables were 
released.", 0, numUnreleasedBCVars);
+                       Assert.assertEquals("Not all TCP connections were 
released.", 0, numActiveConnections);
+               }
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Result Checking
+       // 
--------------------------------------------------------------------------------------------
+
+       public static BufferedReader[] getResultReader(String resultPath) 
throws IOException {
+               return getResultReader(resultPath, new String[]{}, false);
+       }
+
+       public static BufferedReader[] getResultReader(String resultPath, 
String[] excludePrefixes,
+                                                                               
        boolean inOrderOfFiles) throws IOException {
+               File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
+
+               if (inOrderOfFiles) {
+                       // sort the files after their name (1, 2, 3, 4)...
+                       // we cannot sort by path, because strings sort by 
prefix
+                       Arrays.sort(files, new Comparator<File>() {
+
+                               @Override
+                               public int compare(File o1, File o2) {
+                                       try {
+                                               int f1 = 
Integer.parseInt(o1.getName());
+                                               int f2 = 
Integer.parseInt(o2.getName());
+                                               return f1 < f2 ? -1 : (f1 > f2 
? 1 : 0);
+                                       }
+                                       catch (NumberFormatException e) {
+                                               throw new RuntimeException("The 
file names are no numbers and cannot be ordered: " +
+                                                               o1.getName() + 
"/" + o2.getName());
+                                       }
+                               }
+                       });
+               }
+
+               BufferedReader[] readers = new BufferedReader[files.length];
+               for (int i = 0; i < files.length; i++) {
+                       readers[i] = new BufferedReader(new 
FileReader(files[i]));
+               }
+               return readers;
+       }
+       
+       public static BufferedInputStream[] getResultInputStream(String 
resultPath) throws IOException {
+               return getResultInputStream(resultPath, new String[]{});
+       }
+
+       public static BufferedInputStream[] getResultInputStream(String 
resultPath, String[]
+                       excludePrefixes) throws IOException {
+               File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
+               BufferedInputStream[] inStreams = new 
BufferedInputStream[files.length];
+               for (int i = 0; i < files.length; i++) {
+                       inStreams[i] = new BufferedInputStream(new 
FileInputStream(files[i]));
+               }
+               return inStreams;
+       }
+
+       public static void readAllResultLines(List<String> target, String 
resultPath) throws IOException {
+               readAllResultLines(target, resultPath, new String[]{});
+       }
+
+       public static void readAllResultLines(List<String> target, String 
resultPath, String[] excludePrefixes) 
+                       throws IOException {
+               
+               readAllResultLines(target, resultPath, excludePrefixes, false);
+       }
+
+       public static void readAllResultLines(List<String> target, String 
resultPath, 
+                                                                               
        String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+
+               final BufferedReader[] readers = getResultReader(resultPath, 
excludePrefixes, inOrderOfFiles);
+               try {
+                       for (BufferedReader reader : readers) {
+                               String s;
+                               while ((s = reader.readLine()) != null) {
+                                       target.add(s);
+                               }
+                       }
+               }
+               finally {
+                       for (BufferedReader reader : readers) {
+                               try {
+                                       reader.close();
+                               }
+                               catch (Exception e) {
+                                       // ignore, this is best-effort cleanup
+                               }
+                       }
+               }
+       }
+
+       public static void compareResultsByLinesInMemory(String 
expectedResultStr, String resultPath) throws Exception {
+               compareResultsByLinesInMemory(expectedResultStr, resultPath, 
new String[0]);
+       }
+
+       public static void compareResultsByLinesInMemory(String 
expectedResultStr, String resultPath,
+                                                                               
        String[] excludePrefixes) throws Exception {
+               ArrayList<String> list = new ArrayList<>();
+               readAllResultLines(list, resultPath, excludePrefixes, false);
+
+               String[] result = list.toArray(new String[list.size()]);
+               Arrays.sort(result);
+
+               String[] expected = expectedResultStr.isEmpty() ? new String[0] 
: expectedResultStr.split("\n");
+               Arrays.sort(expected);
+
+               Assert.assertEquals("Different number of lines in expected and 
obtained result.", expected.length, result.length);
+               Assert.assertArrayEquals(expected, result);
+       }
+
+       public static void compareResultsByLinesInMemoryWithStrictOrder(String 
expectedResultStr,
+                                                                               
                                                        String resultPath) 
throws Exception {
+               compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, 
resultPath, new String[]{});
+       }
+
+       public static void compareResultsByLinesInMemoryWithStrictOrder(String 
expectedResultStr,
+                                                                               
                                                        String resultPath, 
String[] excludePrefixes) throws Exception {
+               ArrayList<String> list = new ArrayList<>();
+               readAllResultLines(list, resultPath, excludePrefixes, true);
+
+               String[] result = list.toArray(new String[list.size()]);
+
+               String[] expected = expectedResultStr.split("\n");
+
+               Assert.assertEquals("Different number of lines in expected and 
obtained result.", expected.length, result.length);
+               Assert.assertArrayEquals(expected, result);
+       }
+
+       public static void checkLinesAgainstRegexp(String resultPath, String 
regexp){
+               Pattern pattern = Pattern.compile(regexp);
+               Matcher matcher = pattern.matcher("");
+
+               ArrayList<String> list = new ArrayList<>();
+               try {
+                       readAllResultLines(list, resultPath, new String[]{}, 
false);
+               } catch (IOException e1) {
+                       Assert.fail("Error reading the result");
+               }
+
+               for (String line : list){
+                       matcher.reset(line);
+                       if (!matcher.find()){
+                               String msg = "Line is not well-formed: " + line;
+                               Assert.fail(msg);
+                       }
+               }
+       }
+
+       public static void compareKeyValuePairsWithDelta(String expectedLines, 
String resultPath,
+                                                                               
                                String delimiter, double maxDelta) throws 
Exception {
+               compareKeyValuePairsWithDelta(expectedLines, resultPath, new 
String[]{}, delimiter, maxDelta);
+       }
+
+       public static void compareKeyValuePairsWithDelta(String expectedLines, 
String resultPath,
+                                                                               
                                String[] excludePrefixes, String delimiter, 
double maxDelta) throws Exception {
+               ArrayList<String> list = new ArrayList<>();
+               readAllResultLines(list, resultPath, excludePrefixes, false);
+
+               String[] result = list.toArray(new String[list.size()]);
+               String[] expected = expectedLines.isEmpty() ? new String[0] : 
expectedLines.split("\n");
+
+               Assert.assertEquals("Wrong number of result lines.", 
expected.length, result.length);
+
+               Arrays.sort(result);
+               Arrays.sort(expected);
+
+               for (int i = 0; i < expected.length; i++) {
+                       String[] expectedFields = expected[i].split(delimiter);
+                       String[] resultFields = result[i].split(delimiter);
+
+                       double expectedPayLoad = 
Double.parseDouble(expectedFields[1]);
+                       double resultPayLoad = 
Double.parseDouble(resultFields[1]);
+
+                       Assert.assertTrue("Values differ by more than the 
permissible delta", Math.abs(expectedPayLoad - resultPayLoad) < maxDelta);
+               }
+       }
+
+       public static <X> void compareResultCollections(List<X> expected, 
List<X> actual,
+                                                                               
        Comparator<X> comparator) {
+               Assert.assertEquals(expected.size(), actual.size());
+
+               Collections.sort(expected, comparator);
+               Collections.sort(actual, comparator);
+
+               for (int i = 0; i < expected.size(); i++) {
+                       Assert.assertEquals(expected.get(i), actual.get(i));
+               }
+       }
+
+       private static File[] getAllInvolvedFiles(String resultPath, String[] 
excludePrefixes) {
+               final String[] exPrefs = excludePrefixes;
+               File result = asFile(resultPath);
+               if (!result.exists()) {
+                       Assert.fail("Result file was not written");
+               }
+               if (result.isDirectory()) {
+                       return result.listFiles(new FilenameFilter() {
+
+                               @Override
+                               public boolean accept(File dir, String name) {
+                                       for(String p: exPrefs) {
+                                               if(name.startsWith(p)) {
+                                                       return false;
+                                               }
+                                       }
+                                       return true;
+                               }
+                       });
+               } else {
+                       return new File[] { result };
+               }
+       }
+
+       protected static File asFile(String path) {
+               try {
+                       URI uri = new URI(path);
+                       if (uri.getScheme().equals("file")) {
+                               return new File(uri.getPath());
+                       } else {
+                               throw new IllegalArgumentException("This path 
does not denote a local file.");
+                       }
+               } catch (URISyntaxException | NullPointerException e) {
+                       throw new IllegalArgumentException("This path does not 
describe a valid local file URI.");
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Comparison methods for tests using collect()
+       // 
--------------------------------------------------------------------------------------------
+
+       public static <T> void compareResultAsTuples(List<T> result, String 
expected) {
+               compareResult(result, expected, true, true);
+       }
+
+       public static <T> void compareResultAsText(List<T> result, String 
expected) {
+               compareResult(result, expected,
+                               false, true);
+       }
+
+       public static <T> void compareOrderedResultAsText(List<T> result, 
String expected) {
+               compareResult(result, expected, false, false);
+       }
+
+       public static <T> void compareOrderedResultAsText(List<T> result, 
String expected, boolean asTuples) {
+               compareResult(result, expected, asTuples, false);
+       }
+       
+       private static <T> void compareResult(List<T> result, String expected, 
boolean asTuples, boolean sort) {
+               String[] expectedStrings = expected.split("\n");
+               String[] resultStrings = new String[result.size()];
+               
+               for (int i = 0; i < resultStrings.length; i++) {
+                       T val = result.get(i);
+                       
+                       if (asTuples) {
+                               if (val instanceof Tuple) {
+                                       Tuple t = (Tuple) val;
+                                       Object first = t.getField(0);
+                                       StringBuilder bld = new 
StringBuilder(first == null ? "null" : first.toString());
+                                       for (int pos = 1; pos < t.getArity(); 
pos++) {
+                                               Object next = t.getField(pos);
+                                               bld.append(',').append(next == 
null ? "null" : next.toString());
+                                       }
+                                       resultStrings[i] = bld.toString();
+                               }
+                               else {
+                                       throw new IllegalArgumentException(val 
+ " is no tuple");
+                               }
+                       }
+                       else {
+                               resultStrings[i] = (val == null) ? "null" : 
val.toString();
+                       }
+               }
+               
+               assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
+
+               if (sort) {
+                       Arrays.sort(expectedStrings);
+                       Arrays.sort(resultStrings);
+               }
+               
+               for (int i = 0; i < expectedStrings.length; i++) {
+                       assertEquals(expectedStrings[i], resultStrings[i]);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // Comparison methods for tests using sample
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * The expected string contains all expected results separate with line 
break, check whether all elements in result
+        * are contained in the expected string.
+        * @param result The test result.
+        * @param expected The expected string value combination.
+        * @param <T> The result type.
+        */
+       public static <T> void containsResultAsText(List<T> result, String 
expected) {
+               String[] expectedStrings = expected.split("\n");
+               List<String> resultStrings = Lists.newLinkedList();
+
+               for (int i = 0; i < result.size(); i++) {
+                       T val = result.get(i);
+                       String str = (val == null) ? "null" : val.toString();
+                       resultStrings.add(str);
+               }
+
+               List<String> expectedStringList = 
Arrays.asList(expectedStrings);
+
+               for (String element : resultStrings) {
+                       assertTrue(expectedStringList.contains(element));
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Miscellaneous helper methods
+       // 
--------------------------------------------------------------------------------------------
+
+       protected static Collection<Object[]> toParameterList(Configuration ... 
testConfigs) {
+               ArrayList<Object[]> configs = new ArrayList<>();
+               for (Configuration testConfig : testConfigs) {
+                       Object[] c = { testConfig };
+                       configs.add(c);
+               }
+               return configs;
+       }
+
+       protected static Collection<Object[]> 
toParameterList(List<Configuration> testConfigs) {
+               LinkedList<Object[]> configs = new LinkedList<>();
+               for (Configuration testConfig : testConfigs) {
+                       Object[] c = { testConfig };
+                       configs.add(c);
+               }
+               return configs;
+       }
+
+       // This code is taken from: http://stackoverflow.com/a/7201825/568695
+       // it changes the environment variables of this JVM. Use only for 
testing purposes!
+       @SuppressWarnings("unchecked")
+       public static void setEnv(Map<String, String> newenv) {
+               try {
+                       Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+                       Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+                       theEnvironmentField.setAccessible(true);
+                       Map<String, String> env = (Map<String, String>) 
theEnvironmentField.get(null);
+                       env.putAll(newenv);
+                       Field theCaseInsensitiveEnvironmentField = 
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+                       theCaseInsensitiveEnvironmentField.setAccessible(true);
+                       Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
+                       cienv.putAll(newenv);
+               } catch (NoSuchFieldException e) {
+                       try {
+                               Class<?>[] classes = 
Collections.class.getDeclaredClasses();
+                               Map<String, String> env = System.getenv();
+                               for (Class<?> cl : classes) {
+                                       if 
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+                                               Field field = 
cl.getDeclaredField("m");
+                                               field.setAccessible(true);
+                                               Object obj = field.get(env);
+                                               Map<String, String> map = 
(Map<String, String>) obj;
+                                               map.clear();
+                                               map.putAll(newenv);
+                                       }
+                               }
+                       } catch (Exception e2) {
+                               throw new RuntimeException(e2);
+                       }
+               } catch (Exception e1) {
+                       throw new RuntimeException(e1);
+               }
+       }
+       // 
--------------------------------------------------------------------------------------------
+       //  File helper methods
+       // 
--------------------------------------------------------------------------------------------
+
+       protected static void deleteRecursively(File f) throws IOException {
+               if (f.isDirectory()) {
+                       FileUtils.deleteDirectory(f);
+               } else if (!f.delete()) {
+                       System.err.println("Failed to delete file " + 
f.getAbsolutePath());
+               }
+       }
+       
+       public static String constructTestPath(Class<?> forClass, String 
folder) {
+               // we create test path that depends on class to prevent name 
clashes when two tests
+               // create temp files with the same name
+               String path = System.getProperty("java.io.tmpdir");
+               if (!(path.endsWith("/") || path.endsWith("\\")) ) {
+                       path += System.getProperty("file.separator");
+               }
+               path += (forClass.getName() + "-" + folder);
+               return path;
+       }
+       
+       public static String constructTestURI(Class<?> forClass, String folder) 
{
+               return new File(constructTestPath(forClass, 
folder)).toURI().toString();
+       }
+
+       
//---------------------------------------------------------------------------------------------
+       // Web utils
+       
//---------------------------------------------------------------------------------------------
+
+       public static String getFromHTTP(String url) throws Exception {
+               URL u = new URL(url);
+               LOG.info("Accessing URL "+url+" as URL: "+u);
+               HttpURLConnection connection = (HttpURLConnection) 
u.openConnection();
+               connection.setConnectTimeout(100000);
+               connection.connect();
+               InputStream is;
+               if(connection.getResponseCode() >= 400) {
+                       // error!
+                       LOG.warn("HTTP Response code when connecting to {} was 
{}", url, connection.getResponseCode());
+                       is = connection.getErrorStream();
+               } else {
+                       is = connection.getInputStream();
+               }
+
+               return IOUtils.toString(is, connection.getContentEncoding() != 
null ? connection.getContentEncoding() : "UTF-8");
+       }
+       
+       public static class TupleComparator<T extends Tuple> implements 
Comparator<T> {
+
+               @Override
+               public int compare(T o1, T o2) {
+                       if (o1 == null || o2 == null) {
+                               throw new IllegalArgumentException("Cannot 
compare null tuples");
+                       }
+                       else if (o1.getArity() != o2.getArity()) {
+                               return o1.getArity() - o2.getArity();
+                       }
+                       else {
+                               for (int i = 0; i < o1.getArity(); i++) {
+                                       Object val1 = o1.getField(i);
+                                       Object val2 = o2.getField(i);
+                                       
+                                       int cmp;
+                                       if (val1 != null && val2 != null) {
+                                               cmp = compareValues(val1, val2);
+                                       }
+                                       else {
+                                               cmp = val1 == null ? (val2 == 
null ? 0 : -1) : 1;
+                                       }
+                                       
+                                       if (cmp != 0) {
+                                               return cmp;
+                                       }
+                               }
+                               
+                               return 0;
+                       }
+               }
+               
+               @SuppressWarnings("unchecked")
+               private static <X extends Comparable<X>> int 
compareValues(Object o1, Object o2) {
+                       if (o1 instanceof Comparable && o2 instanceof 
Comparable) {
+                               X c1 = (X) o1;
+                               X c2 = (X) o2;
+                               return c1.compareTo(c2);
+                       }
+                       else {
+                               throw new IllegalArgumentException("Cannot 
compare tuples with non comparable elements");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/java/org/apache/flink/test/util/TestEnvironment.java
 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/TestEnvironment.java
new file mode 100644
index 0000000..7cb88be
--- /dev/null
+++ 
b/flink-test-utils/src/test/java/org/apache/flink/test/util/TestEnvironment.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class TestEnvironment extends ExecutionEnvironment {
+
+       private final ForkableFlinkMiniCluster executor;
+
+       private TestEnvironment lastEnv = null;
+
+       @Override
+       public JobExecutionResult getLastJobExecutionResult() {
+               if (lastEnv == null) {
+                       return this.lastJobExecutionResult;
+               }
+               else {
+                       return lastEnv.getLastJobExecutionResult();
+               }
+       }
+
+       public TestEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism) {
+               this.executor = executor;
+               setParallelism(parallelism);
+
+               // disabled to improve build time
+               getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
+       }
+
+       public TestEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism, boolean isObjectReuseEnabled) {
+               this(executor, parallelism);
+
+               if (isObjectReuseEnabled) {
+                       getConfig().enableObjectReuse();
+               } else {
+                       getConfig().disableObjectReuse();
+               }
+       }
+
+       @Override
+       public void startNewSession() throws Exception {
+       }
+
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               OptimizedPlan op = compileProgram(jobName);
+
+               JobGraphGenerator jgg = new JobGraphGenerator();
+               JobGraph jobGraph = jgg.compileJobGraph(op);
+
+               this.lastJobExecutionResult = 
executor.submitJobAndWait(jobGraph, false);
+               return this.lastJobExecutionResult;
+       }
+
+
+       @Override
+       public String getExecutionPlan() throws Exception {
+               OptimizedPlan op = compileProgram("unused");
+
+               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+               return jsonGen.getOptimizerPlanAsJSON(op);
+       }
+
+
+       private OptimizedPlan compileProgram(String jobName) {
+               Plan p = createProgramPlan(jobName);
+
+               Optimizer pc = new Optimizer(new DataStatistics(), 
this.executor.configuration());
+               return pc.compile(p);
+       }
+
+       public void setAsContext() {
+               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
+                       @Override
+                       public ExecutionEnvironment 
createExecutionEnvironment() {
+                               lastEnv = new TestEnvironment(executor, 
getParallelism(), getConfig().isObjectReuseEnabled());
+                               return lastEnv;
+                       }
+               };
+
+               initializeContextEnvironment(factory);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/scala/org/apache/flink/test/util/FlinkTestBase.scala
 
b/flink-test-utils/src/test/scala/org/apache/flink/test/util/FlinkTestBase.scala
new file mode 100644
index 0000000..715bc55
--- /dev/null
+++ 
b/flink-test-utils/src/test/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import org.scalatest.{Suite, BeforeAndAfter}
+
+/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala 
based tests.
+  * Additionally a TestEnvironment with the started cluster is created and set 
as the default
+  * [[org.apache.flink.api.java.ExecutionEnvironment]].
+  *
+  * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a 
number of slots given
+  * by parallelism. This value can be overridden in a sub class in order to 
start the cluster
+  * with a different number of slots.
+  *
+  * The cluster is started once before starting the tests and is re-used for 
the individual tests.
+  * After all tests have been executed, the cluster is shutdown.
+  *
+  * The cluster is used by obtaining the default 
[[org.apache.flink.api.java.ExecutionEnvironment]].
+  *
+  * @example
+  *          {{{
+  *            def testSomething: Unit = {
+  *             // Obtain TestEnvironment with started ForkableFlinkMiniCluster
+  *             val env = ExecutionEnvironment.getExecutionEnvironment
+  *
+  *             env.fromCollection(...)
+  *
+  *             env.execute
+  *            }
+  *          }}}
+  *
+  */
+trait FlinkTestBase extends BeforeAndAfter {
+  that: Suite =>
+
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+  val parallelism = 4
+
+  before {
+    val cl = TestBaseUtils.startCluster(
+      1,
+      parallelism,
+      false,
+      false,
+      true)
+
+    val clusterEnvironment = new TestEnvironment(cl, parallelism)
+    clusterEnvironment.setAsContext()
+
+    cluster = Some(cl)
+  }
+
+  after {
+    cluster.map(c => TestBaseUtils.stopCluster(c, 
TestBaseUtils.DEFAULT_TIMEOUT))
+  }
+
+}

Reply via email to