http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/java/org/apache/flink/test/testdata/WordCountData.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/testdata/WordCountData.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/testdata/WordCountData.java
deleted file mode 100644
index 23c649b..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/testdata/WordCountData.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.testdata;
-
-
-/**
- *
- */
-public class WordCountData {
-
-       public static final String TEXT = "Goethe - Faust: Der Tragoedie erster 
Teil\n" + "Prolog im Himmel.\n"
-                       + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
-                       + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
-                       + "Und ihre vorgeschriebne Reise Vollendet sie mit 
Donnergang. Ihr Anblick\n"
-                       + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden 
mag; die unbegreiflich\n"
-                       + "hohen Werke Sind herrlich wie am ersten Tag.\n"
-                       + "GABRIEL: Und schnell und unbegreiflich schnelle 
Dreht sich umher der Erde\n"
-                       + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
-                       + "schaeumt das Meer in breiten Fluessen Am tiefen 
Grund der Felsen auf, Und\n"
-                       + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
-                       + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer 
aufs Land, vom Land\n"
-                       + "aufs Meer, und bilden wuetend eine Kette Der 
tiefsten Wirkung rings umher.\n"
-                       + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
-                       + "deine Boten, Herr, verehren Das sanfte Wandeln 
deines Tags.\n"
-                       + "ZU DREI: Der Anblick gibt den Engeln Staerke, Da 
keiner dich ergruenden\n"
-                       + "mag, Und alle deine hohen Werke Sind herrlich wie am 
ersten Tag.\n"
-                       + "MEPHISTOPHELES: Da du, o Herr, dich einmal wieder 
nahst Und fragst, wie\n"
-                       + "alles sich bei uns befinde, Und du mich sonst 
gewoehnlich gerne sahst, So\n"
-                       + "siehst du mich auch unter dem Gesinde. Verzeih, ich 
kann nicht hohe Worte\n"
-                       + "machen, Und wenn mich auch der ganze Kreis 
verhoehnt; Mein Pathos braechte\n"
-                       + "dich gewiss zum Lachen, Haettst du dir nicht das 
Lachen abgewoehnt. Von\n"
-                       + "Sonn' und Welten weiss ich nichts zu sagen, Ich sehe 
nur, wie sich die\n"
-                       + "Menschen plagen. Der kleine Gott der Welt bleibt 
stets von gleichem\n"
-                       + "Schlag, Und ist so wunderlich als wie am ersten Tag. 
Ein wenig besser\n"
-                       + "wuerd er leben, Haettst du ihm nicht den Schein des 
Himmelslichts gegeben;\n"
-                       + "Er nennt's Vernunft und braucht's allein, Nur 
tierischer als jedes Tier\n"
-                       + "zu sein. Er scheint mir, mit Verlaub von euer 
Gnaden, Wie eine der\n"
-                       + "langbeinigen Zikaden, Die immer fliegt und fliegend 
springt Und gleich im\n"
-                       + "Gras ihr altes Liedchen singt; Und laeg er nur noch 
immer in dem Grase! In\n"
-                       + "jeden Quark begraebt er seine Nase.\n"
-                       + "DER HERR: Hast du mir weiter nichts zu sagen? Kommst 
du nur immer\n"
-                       + "anzuklagen? Ist auf der Erde ewig dir nichts 
recht?\n"
-                       + "MEPHISTOPHELES: Nein Herr! ich find es dort, wie 
immer, herzlich\n"
-                       + "schlecht. Die Menschen dauern mich in ihren 
Jammertagen, Ich mag sogar\n"
-                       + "die armen selbst nicht plagen.\n" + "DER HERR: 
Kennst du den Faust?\n" + "MEPHISTOPHELES: Den Doktor?\n"
-                       + "DER HERR: Meinen Knecht!\n"
-                       + "MEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre 
Weise. Nicht irdisch\n"
-                       + "ist des Toren Trank noch Speise. Ihn treibt die 
Gaerung in die Ferne, Er\n"
-                       + "ist sich seiner Tollheit halb bewusst; Vom Himmel 
fordert er die schoensten\n"
-                       + "Sterne Und von der Erde jede hoechste Lust, Und alle 
Naeh und alle Ferne\n"
-                       + "Befriedigt nicht die tiefbewegte Brust.\n"
-                       + "DER HERR: Wenn er mir auch nur verworren dient, So 
werd ich ihn bald in\n"
-                       + "die Klarheit fuehren. Weiss doch der Gaertner, wenn 
das Baeumchen gruent, Das\n"
-                       + "Bluet und Frucht die kuenft'gen Jahre zieren.\n"
-                       + "MEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch 
verlieren! Wenn Ihr\n"
-                       + "mir die Erlaubnis gebt, Ihn meine Strasse sacht zu 
fuehren.\n"
-                       + "DER HERR: Solang er auf der Erde lebt, So lange sei 
dir's nicht verboten,\n"
-                       + "Es irrt der Mensch so lang er strebt.\n"
-                       + "MEPHISTOPHELES: Da dank ich Euch; denn mit den Toten 
Hab ich mich niemals\n"
-                       + "gern befangen. Am meisten lieb ich mir die vollen, 
frischen Wangen. Fuer\n"
-                       + "einem Leichnam bin ich nicht zu Haus; Mir geht es 
wie der Katze mit der Maus.\n"
-                       + "DER HERR: Nun gut, es sei dir ueberlassen! Zieh 
diesen Geist von seinem\n"
-                       + "Urquell ab, Und fuehr ihn, kannst du ihn erfassen, 
Auf deinem Wege mit\n"
-                       + "herab, Und steh beschaemt, wenn du bekennen musst: 
Ein guter Mensch, in\n"
-                       + "seinem dunklen Drange, Ist sich des rechten Weges 
wohl bewusst.\n"
-                       + "MEPHISTOPHELES: Schon gut! nur dauert es nicht 
lange. Mir ist fuer meine\n"
-                       + "Wette gar nicht bange. Wenn ich zu meinem Zweck 
gelange, Erlaubt Ihr mir\n"
-                       + "Triumph aus voller Brust. Staub soll er fressen, und 
mit Lust, Wie meine\n"
-                       + "Muhme, die beruehmte Schlange.\n"
-                       + "DER HERR: Du darfst auch da nur frei erscheinen; Ich 
habe deinesgleichen\n"
-                       + "nie gehasst. Von allen Geistern, die verneinen, ist 
mir der Schalk am\n"
-                       + "wenigsten zur Last. Des Menschen Taetigkeit kann 
allzu leicht erschlaffen,\n"
-                       + "er liebt sich bald die unbedingte Ruh; Drum geb ich 
gern ihm den Gesellen\n"
-                       + "zu, Der reizt und wirkt und muss als Teufel 
schaffen. Doch ihr, die echten\n"
-                       + "Goettersoehne, Erfreut euch der lebendig reichen 
Schoene! Das Werdende, das\n"
-                       + "ewig wirkt und lebt, Umfass euch mit der Liebe 
holden Schranken, Und was\n"
-                       + "in schwankender Erscheinung schwebt, Befestigt mit 
dauernden Gedanken!\n"
-                       + "(Der Himmel schliesst, die Erzengel verteilen 
sich.)\n"
-                       + "MEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich 
den Alten gern, Und\n"
-                       + "huete mich, mit ihm zu brechen. Es ist gar huebsch 
von einem grossen Herrn,\n"
-                       + "So menschlich mit dem Teufel selbst zu sprechen.";
-
-       public static final String COUNTS = "machen 1\n" + "zeit 2\n" + 
"heerscharen 1\n" + "keiner 2\n" + "meine 3\n"
-                       + "fuehr 1\n" + "triumph 1\n" + "kommst 1\n" + "frei 
1\n" + "schaffen 1\n" + "gesinde 1\n"
-                       + "langbeinigen 1\n" + "schalk 1\n" + "besser 1\n" + 
"solang 1\n" + "meer 4\n" + "fragst 1\n"
-                       + "gabriel 1\n" + "selbst 2\n" + "bin 1\n" + "sich 7\n" 
+ "du 11\n" + "sogar 1\n" + "geht 1\n"
-                       + "immer 4\n" + "mensch 2\n" + "befestigt 1\n" + "lebt 
2\n" + "mag 3\n" + "engeln 2\n" + "breiten 1\n"
-                       + "blitzendes 1\n" + "tags 1\n" + "sie 2\n" + "plagen 
2\n" + "allzu 1\n" + "meisten 1\n" + "o 1\n"
-                       + "pfade 1\n" + "kennst 1\n" + "nichts 3\n" + "gedanken 
1\n" + "befriedigt 1\n" + "mich 6\n" + "s 3\n"
-                       + "es 8\n" + "verneinen 1\n" + "er 13\n" + "gleich 1\n" 
+ "baeumchen 1\n" + "donnergang 1\n"
-                       + "wunderlich 1\n" + "reise 1\n" + "urquell 1\n" + 
"doch 3\n" + "aufs 2\n" + "toten 1\n" + "niemals 1\n"
-                       + "eine 2\n" + "hab 1\n" + "darfst 1\n" + "da 5\n" + 
"gen 1\n" + "einem 2\n" + "teil 1\n" + "das 7\n"
-                       + "speise 1\n" + "wenig 1\n" + "sterne 1\n" + "geb 1\n" 
+ "welten 1\n" + "alle 3\n" + "toent 1\n"
-                       + "gras 1\n" + "felsen 1\n" + "kette 1\n" + "ich 14\n" 
+ "fuer 2\n" + "als 3\n" + "mein 1\n"
-                       + "schoene 1\n" + "verzeih 1\n" + "schwankender 1\n" + 
"wie 9\n" + "menschlich 1\n" + "gaertner 1\n"
-                       + "taetigkeit 1\n" + "bange 1\n" + "liebe 1\n" + "sei 
2\n" + "seh 1\n" + "tollheit 1\n" + "am 6\n"
-                       + "michael 1\n" + "geist 1\n" + "ab 1\n" + "nahst 1\n" 
+ "vollendet 1\n" + "liebt 1\n" + "brausen 1\n"
-                       + "nase 1\n" + "erlaubt 1\n" + "weiss 2\n" + "schnellem 
1\n" + "deinem 1\n" + "gleichem 1\n"
-                       + "gaerung 1\n" + "dauernden 1\n" + "deines 1\n" + 
"vorgeschriebne 1\n" + "irdisch 1\n" + "worte 1\n"
-                       + "verehren 1\n" + "hohen 2\n" + "weise 2\n" + "kuenft 
1\n" + "werdende 1\n" + "wette 2\n" + "wuetend 1\n"
-                       + "erscheinung 1\n" + "gar 2\n" + "verlieren 1\n" + 
"braucht 1\n" + "weiter 1\n" + "trank 1\n"
-                       + "tierischer 1\n" + "wohl 1\n" + "verteilen 1\n" + 
"verhoehnt 1\n" + "schaeumt 1\n" + "himmelslichts 1\n"
-                       + "unbedingte 1\n" + "herzlich 1\n" + "anblick 2\n" + 
"nennt 1\n" + "gruent 1\n" + "bluet 1\n"
-                       + "leichnam 1\n" + "erschlaffen 1\n" + "jammertagen 
1\n" + "zieh 1\n" + "ihm 3\n" + "besondre 1\n"
-                       + "ihn 5\n" + "grossen 1\n" + "vollen 1\n" + "ihr 7\n" 
+ "boten 1\n" + "voller 1\n" + "singt 1\n"
-                       + "muhme 1\n" + "schon 1\n" + "last 1\n" + "kleine 1\n" 
+ "paradieseshelle 1\n" + "nein 1\n" + "echten 1\n"
-                       + "unter 1\n" + "bei 1\n" + "herr 11\n" + "gern 3\n" + 
"sphaerenlauf 1\n" + "stets 1\n" + "ganze 1\n"
-                       + "braechte 1\n" + "fordert 1\n" + "schoensten 1\n" + 
"herrlich 2\n" + "gegeben 1\n" + "allein 2\n"
-                       + "reichen 1\n" + "schauervoller 1\n" + "musst 1\n" + 
"recht 1\n" + "bleibt 1\n" + "pracht 1\n"
-                       + "treibt 1\n" + "befangen 1\n" + "was 2\n" + "menschen 
3\n" + "jede 1\n" + "hohe 1\n" + "tiefsten 1\n"
-                       + "bilden 1\n" + "drum 1\n" + "gibt 2\n" + "guter 1\n" 
+ "fuerwahr 1\n" + "im 3\n" + "grund 1\n" + "in 9\n"
-                       + "hoechste 1\n" + "schliesst 1\n" + "fels 1\n" + "steh 
1\n" + "euer 1\n" + "erster 1\n" + "ersten 3\n"
-                       + "goettersoehne 1\n" + "brechen 1\n" + "tiefen 1\n" + 
"frucht 1\n" + "kreis 1\n" + "siehst 1\n"
-                       + "wege 1\n" + "ist 8\n" + "zikaden 1\n" + "frischen 
1\n" + "ruh 1\n" + "deine 2\n" + "maus 1\n"
-                       + "brudersphaeren 1\n" + "nachher 1\n" + "euch 4\n" + 
"gnaden 1\n" + "anzuklagen 1\n" + "schlange 1\n"
-                       + "staerke 2\n" + "erde 4\n" + "verlaub 1\n" + "sanfte 
1\n" + "holden 1\n" + "sonst 1\n" + "treten 1\n"
-                       + "sahst 1\n" + "alten 1\n" + "um 1\n" + "wieder 1\n" + 
"alter 1\n" + "altes 1\n" + "nun 1\n" + "lieb 1\n"
-                       + "gesellen 1\n" + "erscheinen 1\n" + "wirkt 2\n" + 
"haettst 2\n" + "nur 7\n" + "tiefbewegte 1\n"
-                       + "lachen 2\n" + "drange 1\n" + "schlag 1\n" + "schein 
1\n" + "muss 1\n" + "verworren 1\n" + "weges 1\n"
-                       + "allen 1\n" + "gewoehnlich 1\n" + "alles 1\n" + "halb 
1\n" + "stuerme 1\n" + "springt 1\n" + "sollt 1\n"
-                       + "klarheit 1\n" + "so 6\n" + "erfassen 1\n" + 
"liedchen 1\n" + "prolog 1\n" + "zur 1\n" + "fressen 1\n"
-                       + "zum 1\n" + "faust 2\n" + "erzengel 2\n" + "jahre 
1\n" + "sonn 1\n" + "raphael 1\n" + "land 2\n"
-                       + "lang 1\n" + "gelange 1\n" + "lust 2\n" + "welt 1\n" 
+ "sehe 1\n" + "ihre 1\n" + "jedes 1\n"
-                       + "erfreut 1\n" + "seiner 1\n" + "denn 1\n" + "wandeln 
1\n" + "wechselt 1\n" + "jeden 1\n" + "dort 1\n"
-                       + "schlecht 1\n" + "wenigsten 1\n" + "wuerd 1\n" + 
"schranken 1\n" + "bewusst 2\n" + "seinem 2\n"
-                       + "gehasst 1\n" + "sein 1\n" + "meinem 1\n" + "meinen 
1\n" + "pathos 1\n" + "herrn 1\n" + "lange 2\n"
-                       + "herab 1\n" + "diesen 1\n" + "ihren 1\n" + "beruehmte 
1\n" + "goethe 1\n" + "tag 3\n" + "tier 1\n"
-                       + "quark 1\n" + "dank 1\n" + "seine 1\n" + "teufel 2\n" 
+ "zweck 1\n" + "wenn 7\n" + "soll 1\n"
-                       + "wirkung 1\n" + "erlaubnis 1\n" + "lebendig 1\n" + 
"uns 1\n" + "leicht 1\n" + "gewiss 1\n"
-                       + "schnell 1\n" + "und 29\n" + "gerne 1\n" + "rechten 
1\n" + "umher 2\n" + "vernunft 1\n" + "grase 1\n"
-                       + "nach 1\n" + "leben 1\n" + "gott 1\n" + "der 29\n" + 
"des 5\n" + "doktor 1\n" + "beschaemt 1\n"
-                       + "dreht 1\n" + "habe 1\n" + "sagen 2\n" + "bekennen 
1\n" + "dunklen 1\n" + "wettet 1\n" + "den 9\n"
-                       + "mephistopheles 9\n" + "dem 4\n" + "auch 4\n" + "kann 
2\n" + "armen 1\n" + "mir 9\n" + "strebt 1\n"
-                       + "gut 2\n" + "mit 11\n" + "bald 2\n" + "himmlischen 
1\n" + "himmel 3\n" + "noch 3\n" + "kannst 1\n"
-                       + "deinesgleichen 1\n" + "flammt 1\n" + "ergruenden 
2\n" + "nacht 1\n" + "scheint 1\n" + "ferne 2\n"
-                       + "tragoedie 1\n" + "abgewoehnt 1\n" + "reizt 1\n" + 
"geistern 1\n" + "nicht 10\n" + "sacht 1\n"
-                       + "unbegreiflich 2\n" + "schnelle 1\n" + "einmal 1\n" + 
"werd 1\n" + "werke 2\n" + "begraebt 1\n"
-                       + "knecht 1\n" + "rings 1\n" + "wird 1\n" + "katze 1\n" 
+ "huete 1\n" + "fortgerissen 1\n" + "gebt 1\n"
-                       + "huebsch 1\n" + "hast 1\n" + "irrt 1\n" + "befinde 
1\n" + "sind 2\n" + "fuehren 2\n" + "fliegt 1\n"
-                       + "ewig 3\n" + "brust 2\n" + "sonne 1\n" + "sprechen 
1\n" + "ein 3\n" + "strasse 1\n" + "von 8\n"
-                       + "ueberlassen 1\n" + "dir 4\n" + "vom 3\n" + "zu 11\n" 
+ "schwebt 1\n" + "die 22\n" + "vor 2\n"
-                       + "wangen 1\n" + "wettgesang 1\n" + "donnerschlags 1\n" 
+ "find 1\n" + "dich 3\n" + "umfass 1\n"
-                       + "verboten 1\n" + "laeg 1\n" + "nie 1\n" + "drei 2\n" 
+ "dauern 1\n" + "toren 1\n" + "dauert 1\n"
-                       + "verheeren 1\n" + "fliegend 1\n" + "aus 1\n" + "staub 
1\n" + "fluessen 1\n" + "haus 1\n" + "auf 5\n"
-                       + "dient 2\n" + "tiefer 1\n" + "naeh 1\n" + "zieren 
1\n";
-
-       public static final String STREAMING_COUNTS_AS_TUPLES = "(machen,1)\n" 
+ "(zeit,1)\n" + "(zeit,2)\n" + "(heerscharen,1)\n" + "(keiner,1)\n"
-                       + "(keiner,2)\n" + "(meine,1)\n" + "(meine,2)\n" + 
"(meine,3)\n" + "(fuehr,1)\n" + "(triumph,1)\n" + "(kommst,1)\n"
-                       + "(frei,1)\n" + "(schaffen,1)\n" + "(gesinde,1)\n" + 
"(langbeinigen,1)\n" + "(schalk,1)\n" + "(besser,1)\n" + "(solang,1)\n"
-                       + "(meer,1)\n" + "(meer,2)\n" + "(meer,3)\n" + 
"(meer,4)\n" + "(fragst,1)\n" + "(gabriel,1)\n" + "(selbst,1)\n" + 
"(selbst,2)\n"
-                       + "(bin,1)\n" + "(sich,1)\n" + "(sich,2)\n" + 
"(sich,3)\n" + "(sich,4)\n" + "(sich,5)\n" + "(sich,6)\n" + "(sich,7)\n" + 
"(du,1)\n"
-                       + "(du,2)\n" + "(du,3)\n" + "(du,4)\n" + "(du,5)\n" + 
"(du,6)\n" + "(du,7)\n" + "(du,8)\n" + "(du,9)\n" + "(du,10)\n" + "(du,11)\n"
-                       + "(sogar,1)\n" + "(geht,1)\n" + "(immer,1)\n" + 
"(immer,2)\n" + "(immer,3)\n" + "(immer,4)\n" + "(mensch,1)\n" + "(mensch,2)\n"
-                       + "(befestigt,1)\n" + "(lebt,1)\n" + "(lebt,2)\n" + 
"(mag,1)\n" + "(mag,2)\n" + "(mag,3)\n" + "(engeln,1)\n" + "(engeln,2)\n"
-                       + "(breiten,1)\n" + "(blitzendes,1)\n" + "(tags,1)\n" + 
"(sie,1)\n" + "(sie,2)\n" + "(plagen,1)\n" + "(plagen,2)\n" + "(allzu,1)\n"
-                       + "(meisten,1)\n" + "(o,1)\n" + "(pfade,1)\n" + 
"(kennst,1)\n" + "(nichts,1)\n" + "(nichts,2)\n" + "(nichts,3)\n" + 
"(gedanken,1)\n"
-                       + "(befriedigt,1)\n" + "(mich,1)\n" + "(mich,2)\n" + 
"(mich,3)\n" + "(mich,4)\n" + "(mich,5)\n" + "(mich,6)\n" + "(s,1)\n" + 
"(s,2)\n"
-                       + "(s,3)\n" + "(es,1)\n" + "(es,2)\n" + "(es,3)\n" + 
"(es,4)\n" + "(es,5)\n" + "(es,6)\n" + "(es,7)\n" + "(es,8)\n" + 
"(verneinen,1)\n"
-                       + "(er,1)\n" + "(er,2)\n" + "(er,3)\n" + "(er,4)\n" + 
"(er,5)\n" + "(er,6)\n" + "(er,7)\n" + "(er,8)\n" + "(er,9)\n" + "(er,10)\n"
-                       + "(er,11)\n" + "(er,12)\n" + "(er,13)\n" + 
"(gleich,1)\n" + "(baeumchen,1)\n" + "(donnergang,1)\n" + "(wunderlich,1)\n"
-                       + "(reise,1)\n" + "(urquell,1)\n" + "(doch,1)\n" + 
"(doch,2)\n" + "(doch,3)\n" + "(aufs,1)\n" + "(aufs,2)\n" + "(toten,1)\n"
-                       + "(niemals,1)\n" + "(eine,1)\n" + "(eine,2)\n" + 
"(hab,1)\n" + "(darfst,1)\n" + "(da,1)\n" + "(da,2)\n" + "(da,3)\n" + "(da,4)\n"
-                       + "(da,5)\n" + "(gen,1)\n" + "(einem,1)\n" + 
"(einem,2)\n" + "(teil,1)\n" + "(das,1)\n" + "(das,2)\n" + "(das,3)\n" + 
"(das,4)\n"
-                       + "(das,5)\n" + "(das,6)\n" + "(das,7)\n" + 
"(speise,1)\n" + "(wenig,1)\n" + "(sterne,1)\n" + "(geb,1)\n" + "(welten,1)\n"
-                       + "(alle,1)\n" + "(alle,2)\n" + "(alle,3)\n" + 
"(toent,1)\n" + "(gras,1)\n" + "(felsen,1)\n" + "(kette,1)\n" + "(ich,1)\n"
-                       + "(ich,2)\n" + "(ich,3)\n" + "(ich,4)\n" + "(ich,5)\n" 
+ "(ich,6)\n" + "(ich,7)\n" + "(ich,8)\n" + "(ich,9)\n" + "(ich,10)\n"
-                       + "(ich,11)\n" + "(ich,12)\n" + "(ich,13)\n" + 
"(ich,14)\n" + "(fuer,1)\n" + "(fuer,2)\n" + "(als,1)\n" + "(als,2)\n" + 
"(als,3)\n"
-                       + "(mein,1)\n" + "(schoene,1)\n" + "(verzeih,1)\n" + 
"(schwankender,1)\n" + "(wie,1)\n" + "(wie,2)\n" + "(wie,3)\n" + "(wie,4)\n"
-                       + "(wie,5)\n" + "(wie,6)\n" + "(wie,7)\n" + "(wie,8)\n" 
+ "(wie,9)\n" + "(menschlich,1)\n" + "(gaertner,1)\n" + "(taetigkeit,1)\n"
-                       + "(bange,1)\n" + "(liebe,1)\n" + "(sei,1)\n" + 
"(sei,2)\n" + "(seh,1)\n" + "(tollheit,1)\n" + "(am,1)\n" + "(am,2)\n" + 
"(am,3)\n"
-                       + "(am,4)\n" + "(am,5)\n" + "(am,6)\n" + 
"(michael,1)\n" + "(geist,1)\n" + "(ab,1)\n" + "(nahst,1)\n" + "(vollendet,1)\n"
-                       + "(liebt,1)\n" + "(brausen,1)\n" + "(nase,1)\n" + 
"(erlaubt,1)\n" + "(weiss,1)\n" + "(weiss,2)\n" + "(schnellem,1)\n"
-                       + "(deinem,1)\n" + "(gleichem,1)\n" + "(gaerung,1)\n" + 
"(dauernden,1)\n" + "(deines,1)\n" + "(vorgeschriebne,1)\n"
-                       + "(irdisch,1)\n" + "(worte,1)\n" + "(verehren,1)\n" + 
"(hohen,1)\n" + "(hohen,2)\n" + "(weise,1)\n" + "(weise,2)\n"
-                       + "(kuenft,1)\n" + "(werdende,1)\n" + "(wette,1)\n" + 
"(wette,2)\n" + "(wuetend,1)\n" + "(erscheinung,1)\n" + "(gar,1)\n"
-                       + "(gar,2)\n" + "(verlieren,1)\n" + "(braucht,1)\n" + 
"(weiter,1)\n" + "(trank,1)\n" + "(tierischer,1)\n" + "(wohl,1)\n"
-                       + "(verteilen,1)\n" + "(verhoehnt,1)\n" + 
"(schaeumt,1)\n" + "(himmelslichts,1)\n" + "(unbedingte,1)\n" + "(herzlich,1)\n"
-                       + "(anblick,1)\n" + "(anblick,2)\n" + "(nennt,1)\n" + 
"(gruent,1)\n" + "(bluet,1)\n" + "(leichnam,1)\n" + "(erschlaffen,1)\n"
-                       + "(jammertagen,1)\n" + "(zieh,1)\n" + "(ihm,1)\n" + 
"(ihm,2)\n" + "(ihm,3)\n" + "(besondre,1)\n" + "(ihn,1)\n" + "(ihn,2)\n"
-                       + "(ihn,3)\n" + "(ihn,4)\n" + "(ihn,5)\n" + 
"(grossen,1)\n" + "(vollen,1)\n" + "(ihr,1)\n" + "(ihr,2)\n" + "(ihr,3)\n"
-                       + "(ihr,4)\n" + "(ihr,5)\n" + "(ihr,6)\n" + "(ihr,7)\n" 
+ "(boten,1)\n" + "(voller,1)\n" + "(singt,1)\n" + "(muhme,1)\n"
-                       + "(schon,1)\n" + "(last,1)\n" + "(kleine,1)\n" + 
"(paradieseshelle,1)\n" + "(nein,1)\n" + "(echten,1)\n" + "(unter,1)\n"
-                       + "(bei,1)\n" + "(herr,1)\n" + "(herr,2)\n" + 
"(herr,3)\n" + "(herr,4)\n" + "(herr,5)\n" + "(herr,6)\n" + "(herr,7)\n"
-                       + "(herr,8)\n" + "(herr,9)\n" + "(herr,10)\n" + 
"(herr,11)\n" + "(gern,1)\n" + "(gern,2)\n" + "(gern,3)\n" + 
"(sphaerenlauf,1)\n"
-                       + "(stets,1)\n" + "(ganze,1)\n" + "(braechte,1)\n" + 
"(fordert,1)\n" + "(schoensten,1)\n" + "(herrlich,1)\n" + "(herrlich,2)\n"
-                       + "(gegeben,1)\n" + "(allein,1)\n" + "(allein,2)\n" + 
"(reichen,1)\n" + "(schauervoller,1)\n" + "(musst,1)\n" + "(recht,1)\n"
-                       + "(bleibt,1)\n" + "(pracht,1)\n" + "(treibt,1)\n" + 
"(befangen,1)\n" + "(was,1)\n" + "(was,2)\n" + "(menschen,1)\n"
-                       + "(menschen,2)\n" + "(menschen,3)\n" + "(jede,1)\n" + 
"(hohe,1)\n" + "(tiefsten,1)\n" + "(bilden,1)\n" + "(drum,1)\n"
-                       + "(gibt,1)\n" + "(gibt,2)\n" + "(guter,1)\n" + 
"(fuerwahr,1)\n" + "(im,1)\n" + "(im,2)\n" + "(im,3)\n" + "(grund,1)\n"
-                       + "(in,1)\n" + "(in,2)\n" + "(in,3)\n" + "(in,4)\n" + 
"(in,5)\n" + "(in,6)\n" + "(in,7)\n" + "(in,8)\n" + "(in,9)\n" + 
"(hoechste,1)\n"
-                       + "(schliesst,1)\n" + "(fels,1)\n" + "(steh,1)\n" + 
"(euer,1)\n" + "(erster,1)\n" + "(ersten,1)\n" + "(ersten,2)\n" + "(ersten,3)\n"
-                       + "(goettersoehne,1)\n" + "(brechen,1)\n" + 
"(tiefen,1)\n" + "(frucht,1)\n" + "(kreis,1)\n" + "(siehst,1)\n" + "(wege,1)\n"
-                       + "(ist,1)\n" + "(ist,2)\n" + "(ist,3)\n" + "(ist,4)\n" 
+ "(ist,5)\n" + "(ist,6)\n" + "(ist,7)\n" + "(ist,8)\n" + "(zikaden,1)\n"
-                       + "(frischen,1)\n" + "(ruh,1)\n" + "(deine,1)\n" + 
"(deine,2)\n" + "(maus,1)\n" + "(brudersphaeren,1)\n" + "(nachher,1)\n"
-                       + "(euch,1)\n" + "(euch,2)\n" + "(euch,3)\n" + 
"(euch,4)\n" + "(gnaden,1)\n" + "(anzuklagen,1)\n" + "(schlange,1)\n" + 
"(staerke,1)\n"
-                       + "(staerke,2)\n" + "(erde,1)\n" + "(erde,2)\n" + 
"(erde,3)\n" + "(erde,4)\n" + "(verlaub,1)\n" + "(sanfte,1)\n" + "(holden,1)\n"
-                       + "(sonst,1)\n" + "(treten,1)\n" + "(sahst,1)\n" + 
"(alten,1)\n" + "(um,1)\n" + "(wieder,1)\n" + "(alter,1)\n" + "(altes,1)\n"
-                       + "(nun,1)\n" + "(lieb,1)\n" + "(gesellen,1)\n" + 
"(erscheinen,1)\n" + "(wirkt,1)\n" + "(wirkt,2)\n" + "(haettst,1)\n" + 
"(haettst,2)\n"
-                       + "(nur,1)\n" + "(nur,2)\n" + "(nur,3)\n" + "(nur,4)\n" 
+ "(nur,5)\n" + "(nur,6)\n" + "(nur,7)\n" + "(tiefbewegte,1)\n" + "(lachen,1)\n"
-                       + "(lachen,2)\n" + "(drange,1)\n" + "(schlag,1)\n" + 
"(schein,1)\n" + "(muss,1)\n" + "(verworren,1)\n" + "(weges,1)\n" + 
"(allen,1)\n"
-                       + "(gewoehnlich,1)\n" + "(alles,1)\n" + "(halb,1)\n" + 
"(stuerme,1)\n" + "(springt,1)\n" + "(sollt,1)\n" + "(klarheit,1)\n"
-                       + "(so,1)\n" + "(so,2)\n" + "(so,3)\n" + "(so,4)\n" + 
"(so,5)\n" + "(so,6)\n" + "(erfassen,1)\n" + "(liedchen,1)\n" + "(prolog,1)\n"
-                       + "(zur,1)\n" + "(fressen,1)\n" + "(zum,1)\n" + 
"(faust,1)\n" + "(faust,2)\n" + "(erzengel,1)\n" + "(erzengel,2)\n" + 
"(jahre,1)\n"
-                       + "(sonn,1)\n" + "(raphael,1)\n" + "(land,1)\n" + 
"(land,2)\n" + "(lang,1)\n" + "(gelange,1)\n" + "(lust,1)\n" + "(lust,2)\n"
-                       + "(welt,1)\n" + "(sehe,1)\n" + "(ihre,1)\n" + 
"(jedes,1)\n" + "(erfreut,1)\n" + "(seiner,1)\n" + "(denn,1)\n" + 
"(wandeln,1)\n"
-                       + "(wechselt,1)\n" + "(jeden,1)\n" + "(dort,1)\n" + 
"(schlecht,1)\n" + "(wenigsten,1)\n" + "(wuerd,1)\n" + "(schranken,1)\n"
-                       + "(bewusst,1)\n" + "(bewusst,2)\n" + "(seinem,1)\n" + 
"(seinem,2)\n" + "(gehasst,1)\n" + "(sein,1)\n" + "(meinem,1)\n"
-                       + "(meinen,1)\n" + "(pathos,1)\n" + "(herrn,1)\n" + 
"(lange,1)\n" + "(lange,2)\n" + "(herab,1)\n" + "(diesen,1)\n" + "(ihren,1)\n"
-                       + "(beruehmte,1)\n" + "(goethe,1)\n" + "(tag,1)\n" + 
"(tag,2)\n" + "(tag,3)\n" + "(tier,1)\n" + "(quark,1)\n" + "(dank,1)\n"
-                       + "(seine,1)\n" + "(teufel,1)\n" + "(teufel,2)\n" + 
"(zweck,1)\n" + "(wenn,1)\n" + "(wenn,2)\n" + "(wenn,3)\n" + "(wenn,4)\n"
-                       + "(wenn,5)\n" + "(wenn,6)\n" + "(wenn,7)\n" + 
"(soll,1)\n" + "(wirkung,1)\n" + "(erlaubnis,1)\n" + "(lebendig,1)\n" + 
"(uns,1)\n"
-                       + "(leicht,1)\n" + "(gewiss,1)\n" + "(schnell,1)\n" + 
"(und,1)\n" + "(und,2)\n" + "(und,3)\n" + "(und,4)\n" + "(und,5)\n" + 
"(und,6)\n"
-                       + "(und,7)\n" + "(und,8)\n" + "(und,9)\n" + 
"(und,10)\n" + "(und,11)\n" + "(und,12)\n" + "(und,13)\n" + "(und,14)\n" + 
"(und,15)\n"
-                       + "(und,16)\n" + "(und,17)\n" + "(und,18)\n" + 
"(und,19)\n" + "(und,20)\n" + "(und,21)\n" + "(und,22)\n" + "(und,23)\n" + 
"(und,24)\n"
-                       + "(und,25)\n" + "(und,26)\n" + "(und,27)\n" + 
"(und,28)\n" + "(und,29)\n" + "(gerne,1)\n" + "(rechten,1)\n" + "(umher,1)\n" + 
"(umher,2)\n"
-                       + "(vernunft,1)\n" + "(grase,1)\n" + "(nach,1)\n" + 
"(leben,1)\n" + "(gott,1)\n" + "(der,1)\n" + "(der,2)\n" + "(der,3)\n" + 
"(der,4)\n"
-                       + "(der,5)\n" + "(der,6)\n" + "(der,7)\n" + "(der,8)\n" 
+ "(der,9)\n" + "(der,10)\n" + "(der,11)\n" + "(der,12)\n" + "(der,13)\n"
-                       + "(der,14)\n" + "(der,15)\n" + "(der,16)\n" + 
"(der,17)\n" + "(der,18)\n" + "(der,19)\n" + "(der,20)\n" + "(der,21)\n" + 
"(der,22)\n"
-                       + "(der,23)\n" + "(der,24)\n" + "(der,25)\n" + 
"(der,26)\n" + "(der,27)\n" + "(der,28)\n" + "(der,29)\n" + "(des,1)\n" + 
"(des,2)\n"
-                       + "(des,3)\n" + "(des,4)\n" + "(des,5)\n" + 
"(doktor,1)\n" + "(beschaemt,1)\n" + "(dreht,1)\n" + "(habe,1)\n" + 
"(sagen,1)\n" + "(sagen,2)\n"
-                       + "(bekennen,1)\n" + "(dunklen,1)\n" + "(wettet,1)\n" + 
"(den,1)\n" + "(den,2)\n" + "(den,3)\n" + "(den,4)\n" + "(den,5)\n" + 
"(den,6)\n"
-                       + "(den,7)\n" + "(den,8)\n" + "(den,9)\n" + 
"(mephistopheles,1)\n" + "(mephistopheles,2)\n" + "(mephistopheles,3)\n"
-                       + "(mephistopheles,4)\n" + "(mephistopheles,5)\n" + 
"(mephistopheles,6)\n" + "(mephistopheles,7)\n" + "(mephistopheles,8)\n"
-                       + "(mephistopheles,9)\n" + "(dem,1)\n" + "(dem,2)\n" + 
"(dem,3)\n" + "(dem,4)\n" + "(auch,1)\n" + "(auch,2)\n" + "(auch,3)\n" + 
"(auch,4)\n"
-                       + "(kann,1)\n" + "(kann,2)\n" + "(armen,1)\n" + 
"(mir,1)\n" + "(mir,2)\n" + "(mir,3)\n" + "(mir,4)\n" + "(mir,5)\n" + 
"(mir,6)\n" + "(mir,7)\n"
-                       + "(mir,8)\n" + "(mir,9)\n" + "(strebt,1)\n" + 
"(gut,1)\n" + "(gut,2)\n" + "(mit,1)\n" + "(mit,2)\n" + "(mit,3)\n" + 
"(mit,4)\n" + "(mit,5)\n"
-                       + "(mit,6)\n" + "(mit,7)\n" + "(mit,8)\n" + "(mit,9)\n" 
+ "(mit,10)\n" + "(mit,11)\n" + "(bald,1)\n" + "(bald,2)\n" + 
"(himmlischen,1)\n"
-                       + "(himmel,1)\n" + "(himmel,2)\n" + "(himmel,3)\n" + 
"(noch,1)\n" + "(noch,2)\n" + "(noch,3)\n" + "(kannst,1)\n" + 
"(deinesgleichen,1)\n"
-                       + "(flammt,1)\n" + "(ergruenden,1)\n" + 
"(ergruenden,2)\n" + "(nacht,1)\n" + "(scheint,1)\n" + "(ferne,1)\n" + 
"(ferne,2)\n"
-                       + "(tragoedie,1)\n" + "(abgewoehnt,1)\n" + 
"(reizt,1)\n" + "(geistern,1)\n" + "(nicht,1)\n" + "(nicht,2)\n" + 
"(nicht,3)\n" + "(nicht,4)\n"
-                       + "(nicht,5)\n" + "(nicht,6)\n" + "(nicht,7)\n" + 
"(nicht,8)\n" + "(nicht,9)\n" + "(nicht,10)\n" + "(sacht,1)\n" + 
"(unbegreiflich,1)\n"
-                       + "(unbegreiflich,2)\n" + "(schnelle,1)\n" + 
"(einmal,1)\n" + "(werd,1)\n" + "(werke,1)\n" + "(werke,2)\n" + "(begraebt,1)\n"
-                       + "(knecht,1)\n" + "(rings,1)\n" + "(wird,1)\n" + 
"(katze,1)\n" + "(huete,1)\n" + "(fortgerissen,1)\n" + "(gebt,1)\n" + 
"(huebsch,1)\n"
-                       + "(hast,1)\n" + "(irrt,1)\n" + "(befinde,1)\n" + 
"(sind,1)\n" + "(sind,2)\n" + "(fuehren,1)\n" + "(fuehren,2)\n" + "(fliegt,1)\n"
-                       + "(ewig,1)\n" + "(ewig,2)\n" + "(ewig,3)\n" + 
"(brust,1)\n" + "(brust,2)\n" + "(sonne,1)\n" + "(sprechen,1)\n" + "(ein,1)\n" 
+ "(ein,2)\n"
-                       + "(ein,3)\n" + "(strasse,1)\n" + "(von,1)\n" + 
"(von,2)\n" + "(von,3)\n" + "(von,4)\n" + "(von,5)\n" + "(von,6)\n" + 
"(von,7)\n" + "(von,8)\n"
-                       + "(ueberlassen,1)\n" + "(dir,1)\n" + "(dir,2)\n" + 
"(dir,3)\n" + "(dir,4)\n" + "(vom,1)\n" + "(vom,2)\n" + "(vom,3)\n" + 
"(zu,1)\n" + "(zu,2)\n"
-                       + "(zu,3)\n" + "(zu,4)\n" + "(zu,5)\n" + "(zu,6)\n" + 
"(zu,7)\n" + "(zu,8)\n" + "(zu,9)\n" + "(zu,10)\n" + "(zu,11)\n" + 
"(schwebt,1)\n"
-                       + "(die,1)\n" + "(die,2)\n" + "(die,3)\n" + "(die,4)\n" 
+ "(die,5)\n" + "(die,6)\n" + "(die,7)\n" + "(die,8)\n" + "(die,9)\n" + 
"(die,10)\n"
-                       + "(die,11)\n" + "(die,12)\n" + "(die,13)\n" + 
"(die,14)\n" + "(die,15)\n" + "(die,16)\n" + "(die,17)\n" + "(die,18)\n" + 
"(die,19)\n"
-                       + "(die,20)\n" + "(die,21)\n" + "(die,22)\n" + 
"(vor,1)\n" + "(vor,2)\n" + "(wangen,1)\n" + "(wettgesang,1)\n" + 
"(donnerschlags,1)\n"
-                       + "(find,1)\n" + "(dich,1)\n" + "(dich,2)\n" + 
"(dich,3)\n" + "(umfass,1)\n" + "(verboten,1)\n" + "(laeg,1)\n" + "(nie,1)\n" + 
"(drei,1)\n"
-                       + "(drei,2)\n" + "(dauern,1)\n" + "(toren,1)\n" + 
"(dauert,1)\n" + "(verheeren,1)\n" + "(fliegend,1)\n" + "(aus,1)\n" + 
"(staub,1)\n"
-                       + "(fluessen,1)\n" + "(haus,1)\n" + "(auf,1)\n" + 
"(auf,2)\n" + "(auf,3)\n" + "(auf,4)\n" + "(auf,5)\n" + "(dient,1)\n" + 
"(dient,2)\n"
-                       + "(tiefer,1)\n" + "(naeh,1)\n" + "(zieren,1)\n";
-
-       public static final String COUNTS_AS_TUPLES = "(machen,1)\n" + 
"(zeit,2)\n" + "(heerscharen,1)\n" + "(keiner,2)\n" + "(meine,3)\n"
-                       + "(fuehr,1)\n" + "(triumph,1)\n" + "(kommst,1)\n" + 
"(frei,1)\n" + "(schaffen,1)\n" + "(gesinde,1)\n"
-                       + "(langbeinigen,1)\n" + "(schalk,1)\n" + 
"(besser,1)\n" + "(solang,1)\n" + "(meer,4)\n" + "(fragst,1)\n"
-                       + "(gabriel,1)\n" + "(selbst,2)\n" + "(bin,1)\n" + 
"(sich,7)\n" + "(du,11)\n" + "(sogar,1)\n" + "(geht,1)\n"
-                       + "(immer,4)\n" + "(mensch,2)\n" + "(befestigt,1)\n" + 
"(lebt,2)\n" + "(mag,3)\n" + "(engeln,2)\n" + "(breiten,1)\n"
-                       + "(blitzendes,1)\n" + "(tags,1)\n" + "(sie,2)\n" + 
"(plagen,2)\n" + "(allzu,1)\n" + "(meisten,1)\n" + "(o,1)\n"
-                       + "(pfade,1)\n" + "(kennst,1)\n" + "(nichts,3)\n" + 
"(gedanken,1)\n" + "(befriedigt,1)\n" + "(mich,6)\n" + "(s,3)\n"
-                       + "(es,8)\n" + "(verneinen,1)\n" + "(er,13)\n" + 
"(gleich,1)\n" + "(baeumchen,1)\n" + "(donnergang,1)\n"
-                       + "(wunderlich,1)\n" + "(reise,1)\n" + "(urquell,1)\n" 
+ "(doch,3)\n" + "(aufs,2)\n" + "(toten,1)\n" + "(niemals,1)\n"
-                       + "(eine,2)\n" + "(hab,1)\n" + "(darfst,1)\n" + 
"(da,5)\n" + "(gen,1)\n" + "(einem,2)\n" + "(teil,1)\n" + "(das,7)\n"
-                       + "(speise,1)\n" + "(wenig,1)\n" + "(sterne,1)\n" + 
"(geb,1)\n" + "(welten,1)\n" + "(alle,3)\n" + "(toent,1)\n"
-                       + "(gras,1)\n" + "(felsen,1)\n" + "(kette,1)\n" + 
"(ich,14)\n" + "(fuer,2)\n" + "(als,3)\n" + "(mein,1)\n"
-                       + "(schoene,1)\n" + "(verzeih,1)\n" + 
"(schwankender,1)\n" + "(wie,9)\n" + "(menschlich,1)\n" + "(gaertner,1)\n"
-                       + "(taetigkeit,1)\n" + "(bange,1)\n" + "(liebe,1)\n" + 
"(sei,2)\n" + "(seh,1)\n" + "(tollheit,1)\n" + "(am,6)\n"
-                       + "(michael,1)\n" + "(geist,1)\n" + "(ab,1)\n" + 
"(nahst,1)\n" + "(vollendet,1)\n" + "(liebt,1)\n" + "(brausen,1)\n"
-                       + "(nase,1)\n" + "(erlaubt,1)\n" + "(weiss,2)\n" + 
"(schnellem,1)\n" + "(deinem,1)\n" + "(gleichem,1)\n"
-                       + "(gaerung,1)\n" + "(dauernden,1)\n" + "(deines,1)\n" 
+ "(vorgeschriebne,1)\n" + "(irdisch,1)\n" + "(worte,1)\n"
-                       + "(verehren,1)\n" + "(hohen,2)\n" + "(weise,2)\n" + 
"(kuenft,1)\n" + "(werdende,1)\n" + "(wette,2)\n" + "(wuetend,1)\n"
-                       + "(erscheinung,1)\n" + "(gar,2)\n" + "(verlieren,1)\n" 
+ "(braucht,1)\n" + "(weiter,1)\n" + "(trank,1)\n"
-                       + "(tierischer,1)\n" + "(wohl,1)\n" + "(verteilen,1)\n" 
+ "(verhoehnt,1)\n" + "(schaeumt,1)\n" + "(himmelslichts,1)\n"
-                       + "(unbedingte,1)\n" + "(herzlich,1)\n" + 
"(anblick,2)\n" + "(nennt,1)\n" + "(gruent,1)\n" + "(bluet,1)\n"
-                       + "(leichnam,1)\n" + "(erschlaffen,1)\n" + 
"(jammertagen,1)\n" + "(zieh,1)\n" + "(ihm,3)\n" + "(besondre,1)\n"
-                       + "(ihn,5)\n" + "(grossen,1)\n" + "(vollen,1)\n" + 
"(ihr,7)\n" + "(boten,1)\n" + "(voller,1)\n" + "(singt,1)\n"
-                       + "(muhme,1)\n" + "(schon,1)\n" + "(last,1)\n" + 
"(kleine,1)\n" + "(paradieseshelle,1)\n" + "(nein,1)\n" + "(echten,1)\n"
-                       + "(unter,1)\n" + "(bei,1)\n" + "(herr,11)\n" + 
"(gern,3)\n" + "(sphaerenlauf,1)\n" + "(stets,1)\n" + "(ganze,1)\n"
-                       + "(braechte,1)\n" + "(fordert,1)\n" + 
"(schoensten,1)\n" + "(herrlich,2)\n" + "(gegeben,1)\n" + "(allein,2)\n"
-                       + "(reichen,1)\n" + "(schauervoller,1)\n" + 
"(musst,1)\n" + "(recht,1)\n" + "(bleibt,1)\n" + "(pracht,1)\n"
-                       + "(treibt,1)\n" + "(befangen,1)\n" + "(was,2)\n" + 
"(menschen,3)\n" + "(jede,1)\n" + "(hohe,1)\n" + "(tiefsten,1)\n"
-                       + "(bilden,1)\n" + "(drum,1)\n" + "(gibt,2)\n" + 
"(guter,1)\n" + "(fuerwahr,1)\n" + "(im,3)\n" + "(grund,1)\n" + "(in,9)\n"
-                       + "(hoechste,1)\n" + "(schliesst,1)\n" + "(fels,1)\n" + 
"(steh,1)\n" + "(euer,1)\n" + "(erster,1)\n" + "(ersten,3)\n"
-                       + "(goettersoehne,1)\n" + "(brechen,1)\n" + 
"(tiefen,1)\n" + "(frucht,1)\n" + "(kreis,1)\n" + "(siehst,1)\n"
-                       + "(wege,1)\n" + "(ist,8)\n" + "(zikaden,1)\n" + 
"(frischen,1)\n" + "(ruh,1)\n" + "(deine,2)\n" + "(maus,1)\n"
-                       + "(brudersphaeren,1)\n" + "(nachher,1)\n" + 
"(euch,4)\n" + "(gnaden,1)\n" + "(anzuklagen,1)\n" + "(schlange,1)\n"
-                       + "(staerke,2)\n" + "(erde,4)\n" + "(verlaub,1)\n" + 
"(sanfte,1)\n" + "(holden,1)\n" + "(sonst,1)\n" + "(treten,1)\n"
-                       + "(sahst,1)\n" + "(alten,1)\n" + "(um,1)\n" + 
"(wieder,1)\n" + "(alter,1)\n" + "(altes,1)\n" + "(nun,1)\n" + "(lieb,1)\n"
-                       + "(gesellen,1)\n" + "(erscheinen,1)\n" + "(wirkt,2)\n" 
+ "(haettst,2)\n" + "(nur,7)\n" + "(tiefbewegte,1)\n"
-                       + "(lachen,2)\n" + "(drange,1)\n" + "(schlag,1)\n" + 
"(schein,1)\n" + "(muss,1)\n" + "(verworren,1)\n" + "(weges,1)\n"
-                       + "(allen,1)\n" + "(gewoehnlich,1)\n" + "(alles,1)\n" + 
"(halb,1)\n" + "(stuerme,1)\n" + "(springt,1)\n" + "(sollt,1)\n"
-                       + "(klarheit,1)\n" + "(so,6)\n" + "(erfassen,1)\n" + 
"(liedchen,1)\n" + "(prolog,1)\n" + "(zur,1)\n" + "(fressen,1)\n"
-                       + "(zum,1)\n" + "(faust,2)\n" + "(erzengel,2)\n" + 
"(jahre,1)\n" + "(sonn,1)\n" + "(raphael,1)\n" + "(land,2)\n"
-                       + "(lang,1)\n" + "(gelange,1)\n" + "(lust,2)\n" + 
"(welt,1)\n" + "(sehe,1)\n" + "(ihre,1)\n" + "(jedes,1)\n"
-                       + "(erfreut,1)\n" + "(seiner,1)\n" + "(denn,1)\n" + 
"(wandeln,1)\n" + "(wechselt,1)\n" + "(jeden,1)\n" + "(dort,1)\n"
-                       + "(schlecht,1)\n" + "(wenigsten,1)\n" + "(wuerd,1)\n" 
+ "(schranken,1)\n" + "(bewusst,2)\n" + "(seinem,2)\n"
-                       + "(gehasst,1)\n" + "(sein,1)\n" + "(meinem,1)\n" + 
"(meinen,1)\n" + "(pathos,1)\n" + "(herrn,1)\n" + "(lange,2)\n"
-                       + "(herab,1)\n" + "(diesen,1)\n" + "(ihren,1)\n" + 
"(beruehmte,1)\n" + "(goethe,1)\n" + "(tag,3)\n" + "(tier,1)\n"
-                       + "(quark,1)\n" + "(dank,1)\n" + "(seine,1)\n" + 
"(teufel,2)\n" + "(zweck,1)\n" + "(wenn,7)\n" + "(soll,1)\n"
-                       + "(wirkung,1)\n" + "(erlaubnis,1)\n" + 
"(lebendig,1)\n" + "(uns,1)\n" + "(leicht,1)\n" + "(gewiss,1)\n"
-                       + "(schnell,1)\n" + "(und,29)\n" + "(gerne,1)\n" + 
"(rechten,1)\n" + "(umher,2)\n" + "(vernunft,1)\n" + "(grase,1)\n"
-                       + "(nach,1)\n" + "(leben,1)\n" + "(gott,1)\n" + 
"(der,29)\n" + "(des,5)\n" + "(doktor,1)\n" + "(beschaemt,1)\n"
-                       + "(dreht,1)\n" + "(habe,1)\n" + "(sagen,2)\n" + 
"(bekennen,1)\n" + "(dunklen,1)\n" + "(wettet,1)\n" + "(den,9)\n"
-                       + "(mephistopheles,9)\n" + "(dem,4)\n" + "(auch,4)\n" + 
"(kann,2)\n" + "(armen,1)\n" + "(mir,9)\n" + "(strebt,1)\n"
-                       + "(gut,2)\n" + "(mit,11)\n" + "(bald,2)\n" + 
"(himmlischen,1)\n" + "(himmel,3)\n" + "(noch,3)\n" + "(kannst,1)\n"
-                       + "(deinesgleichen,1)\n" + "(flammt,1)\n" + 
"(ergruenden,2)\n" + "(nacht,1)\n" + "(scheint,1)\n" + "(ferne,2)\n"
-                       + "(tragoedie,1)\n" + "(abgewoehnt,1)\n" + 
"(reizt,1)\n" + "(geistern,1)\n" + "(nicht,10)\n" + "(sacht,1)\n"
-                       + "(unbegreiflich,2)\n" + "(schnelle,1)\n" + 
"(einmal,1)\n" + "(werd,1)\n" + "(werke,2)\n" + "(begraebt,1)\n"
-                       + "(knecht,1)\n" + "(rings,1)\n" + "(wird,1)\n" + 
"(katze,1)\n" + "(huete,1)\n" + "(fortgerissen,1)\n" + "(gebt,1)\n"
-                       + "(huebsch,1)\n" + "(hast,1)\n" + "(irrt,1)\n" + 
"(befinde,1)\n" + "(sind,2)\n" + "(fuehren,2)\n" + "(fliegt,1)\n"
-                       + "(ewig,3)\n" + "(brust,2)\n" + "(sonne,1)\n" + 
"(sprechen,1)\n" + "(ein,3)\n" + "(strasse,1)\n" + "(von,8)\n"
-                       + "(ueberlassen,1)\n" + "(dir,4)\n" + "(vom,3)\n" + 
"(zu,11)\n" + "(schwebt,1)\n" + "(die,22)\n" + "(vor,2)\n"
-                       + "(wangen,1)\n" + "(wettgesang,1)\n" + 
"(donnerschlags,1)\n" + "(find,1)\n" + "(dich,3)\n" + "(umfass,1)\n"
-                       + "(verboten,1)\n" + "(laeg,1)\n" + "(nie,1)\n" + 
"(drei,2)\n" + "(dauern,1)\n" + "(toren,1)\n" + "(dauert,1)\n"
-                       + "(verheeren,1)\n" + "(fliegend,1)\n" + "(aus,1)\n" + 
"(staub,1)\n" + "(fluessen,1)\n" + "(haus,1)\n" + "(auf,5)\n"
-                       + "(dient,2)\n" + "(tiefer,1)\n" + "(naeh,1)\n" + 
"(zieren,1)\n";
-
-       private WordCountData() {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
deleted file mode 100644
index c2da691..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.configuration.Configuration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-
-/**
- * A base class for tests that run test programs in a Flink mini cluster.
- */
-public abstract class AbstractTestBase extends TestBaseUtils {
-       
-       /** Configuration to start the testing cluster with */
-       protected final Configuration config;
-       
-       private final List<File> tempFiles;
-       
-       private final FiniteDuration timeout;
-
-       protected int taskManagerNumSlots = 1;
-
-       protected int numTaskManagers = 1;
-       
-       /** The mini cluster that runs the test programs */
-       protected ForkableFlinkMiniCluster executor;
-       
-
-       public AbstractTestBase(Configuration config) {
-               this.config = Objects.requireNonNull(config);
-               this.tempFiles = new ArrayList<File>();
-
-               timeout = AkkaUtils.getTimeout(config);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Local Test Cluster Life Cycle
-       // 
--------------------------------------------------------------------------------------------
-
-       public void startCluster() throws Exception {
-               this.executor = startCluster(
-                       numTaskManagers,
-                       taskManagerNumSlots,
-                       false,
-                       false,
-                       true);
-       }
-
-       public void stopCluster() throws Exception {
-               stopCluster(executor, timeout);
-               deleteAllTempFiles();
-       }
-
-       //------------------
-       // Accessors
-       //------------------
-
-       public int getTaskManagerNumSlots() {
-               return taskManagerNumSlots;
-       }
-
-       public void setTaskManagerNumSlots(int taskManagerNumSlots) {
-               this.taskManagerNumSlots = taskManagerNumSlots;
-       }
-
-       public int getNumTaskManagers() {
-               return numTaskManagers;
-       }
-
-       public void setNumTaskManagers(int numTaskManagers) {
-               this.numTaskManagers = numTaskManagers;
-       }
-
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Temporary File Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       public String getTempDirPath(String dirName) throws IOException {
-               File f = createAndRegisterTempFile(dirName);
-               return f.toURI().toString();
-       }
-
-       public String getTempFilePath(String fileName) throws IOException {
-               File f = createAndRegisterTempFile(fileName);
-               return f.toURI().toString();
-       }
-
-       public String createTempFile(String fileName, String contents) throws 
IOException {
-               File f = createAndRegisterTempFile(fileName);
-               Files.write(contents, f, Charsets.UTF_8);
-               return f.toURI().toString();
-       }
-
-       public File createAndRegisterTempFile(String fileName) throws 
IOException {
-               File baseDir = new File(System.getProperty("java.io.tmpdir"));
-               File f = new File(baseDir, this.getClass().getName() + "-" + 
fileName);
-
-               if (f.exists()) {
-                       deleteRecursively(f);
-               }
-
-               File parentToDelete = f;
-               while (true) {
-                       File parent = parentToDelete.getParentFile();
-                       if (parent == null) {
-                               throw new IOException("Missed temp dir while 
traversing parents of a temp file.");
-                       }
-                       if (parent.equals(baseDir)) {
-                               break;
-                       }
-                       parentToDelete = parent;
-               }
-
-               Files.createParentDirs(f);
-               this.tempFiles.add(parentToDelete);
-               return f;
-       }
-
-       private void deleteAllTempFiles() throws IOException {
-               for (File f : this.tempFiles) {
-                       if (f.exists()) {
-                               deleteRecursively(f);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
deleted file mode 100644
index e56c7e8..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-
-public class CollectionTestEnvironment extends CollectionEnvironment {
-
-       private CollectionTestEnvironment lastEnv = null;
-
-       @Override
-       public JobExecutionResult getLastJobExecutionResult() {
-               if (lastEnv == null) {
-                       return this.lastJobExecutionResult;
-               }
-               else {
-                       return lastEnv.getLastJobExecutionResult();
-               }
-       }
-
-       @Override
-       public JobExecutionResult execute() throws Exception {
-               return execute("test job");
-       }
-
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               JobExecutionResult result = super.execute(jobName);
-               this.lastJobExecutionResult = result;
-               return result;
-       }
-
-       protected void setAsContext() {
-               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                       @Override
-                       public ExecutionEnvironment 
createExecutionEnvironment() {
-                               lastEnv = new CollectionTestEnvironment();
-                               return lastEnv;
-                       }
-               };
-
-               initializeContextEnvironment(factory);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
deleted file mode 100644
index e639c80..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import java.util.Comparator;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.java.tuple.Tuple;
-
-
-public abstract class JavaProgramTestBase extends AbstractTestBase {
-
-       private static final int DEFAULT_PARALLELISM = 4;
-       
-       private JobExecutionResult latestExecutionResult;
-       
-       private int parallelism = DEFAULT_PARALLELISM;
-
-       /**
-        * The number of times a test should be repeated.
-        *
-        * <p> This is useful for runtime changes, which affect resource 
management. Running certain
-        * tests repeatedly might help to discover resource leaks, race 
conditions etc.
-        */
-       private int numberOfTestRepetitions = 1;
-
-       private boolean isCollectionExecution;
-
-       public JavaProgramTestBase() {
-               this(new Configuration());
-       }
-       
-       public JavaProgramTestBase(Configuration config) {
-               super(config);
-               setTaskManagerNumSlots(parallelism);
-       }
-       
-       public void setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-               setTaskManagerNumSlots(parallelism);
-       }
-
-       public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
-               this.numberOfTestRepetitions = numberOfTestRepetitions;
-       }
-       
-       public int getParallelism() {
-               return isCollectionExecution ? 1 : parallelism;
-       }
-       
-       public JobExecutionResult getLatestExecutionResult() {
-               return this.latestExecutionResult;
-       }
-       
-       public boolean isCollectionExecution() {
-               return isCollectionExecution;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Methods to create the test program and for pre- and post- test work
-       // 
--------------------------------------------------------------------------------------------
-
-       protected abstract void testProgram() throws Exception;
-
-       protected void preSubmit() throws Exception {}
-       
-       protected void postSubmit() throws Exception {}
-       
-       protected boolean skipCollectionExecution() {
-               return false;
-       };
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Test entry point
-       // 
--------------------------------------------------------------------------------------------
-
-       @Test
-       public void testJobWithObjectReuse() throws Exception {
-               isCollectionExecution = false;
-               
-               startCluster();
-               try {
-                       // pre-submit
-                       try {
-                               preSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               Assert.fail("Pre-submit work caused an error: " 
+ e.getMessage());
-                       }
-                       
-                       // prepare the test environment
-                       TestEnvironment env = new 
TestEnvironment(this.executor, this.parallelism);
-                       env.getConfig().enableObjectReuse();
-                       env.setAsContext();
-
-                       // Possibly run the test multiple times
-                       for (int i = 0; i < numberOfTestRepetitions; i++) {
-                               // call the test program
-                               try {
-                                       testProgram();
-                                       this.latestExecutionResult = 
env.getLastJobExecutionResult();
-                               }
-                               catch (Exception e) {
-                                       System.err.println(e.getMessage());
-                                       e.printStackTrace();
-                                       Assert.fail("Error while calling the 
test program: " + e.getMessage());
-                               }
-
-                               Assert.assertNotNull("The test program never 
triggered an execution.",
-                                               this.latestExecutionResult);
-                       }
-                       
-                       // post-submit
-                       try {
-                               postSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               Assert.fail("Post-submit work caused an error: 
" + e.getMessage());
-                       }
-               } finally {
-                       stopCluster();
-               }
-       }
-
-       @Test
-       public void testJobWithoutObjectReuse() throws Exception {
-               isCollectionExecution = false;
-
-               startCluster();
-               try {
-                       // pre-submit
-                       try {
-                               preSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               Assert.fail("Pre-submit work caused an error: " 
+ e.getMessage());
-                       }
-
-                       // prepare the test environment
-                       TestEnvironment env = new 
TestEnvironment(this.executor, this.parallelism);
-                       env.getConfig().disableObjectReuse();
-                       env.setAsContext();
-
-                       // Possibly run the test multiple times
-                       for (int i = 0; i < numberOfTestRepetitions; i++) {
-                               // call the test program
-                               try {
-                                       testProgram();
-                                       this.latestExecutionResult = 
env.getLastJobExecutionResult();
-                               }
-                               catch (Exception e) {
-                                       System.err.println(e.getMessage());
-                                       e.printStackTrace();
-                                       Assert.fail("Error while calling the 
test program: " + e.getMessage());
-                               }
-
-                               Assert.assertNotNull("The test program never 
triggered an execution.",
-                                               this.latestExecutionResult);
-                       }
-
-                       // post-submit
-                       try {
-                               postSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               Assert.fail("Post-submit work caused an error: 
" + e.getMessage());
-                       }
-               } finally {
-                       stopCluster();
-               }
-       }
-       
-       @Test
-       public void testJobCollectionExecution() throws Exception {
-               
-               // check if collection execution should be skipped.
-               if(this.skipCollectionExecution()) {
-                       return;
-               }
-               
-               isCollectionExecution = true;
-               
-               // pre-submit
-               try {
-                       preSubmit();
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       Assert.fail("Pre-submit work caused an error: " + 
e.getMessage());
-               }
-               
-               // prepare the test environment
-               CollectionTestEnvironment env = new CollectionTestEnvironment();
-               env.setAsContext();
-               
-               // call the test program
-               try {
-                       testProgram();
-                       this.latestExecutionResult = 
env.getLastJobExecutionResult();
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       Assert.fail("Error while calling the test program: " + 
e.getMessage());
-               }
-               
-               Assert.assertNotNull("The test program never triggered an 
execution.", this.latestExecutionResult);
-               
-               // post-submit
-               try {
-                       postSubmit();
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       Assert.fail("Post-submit work caused an error: " + 
e.getMessage());
-               }
-       }
-       
-       public static class TupleComparator<T extends Tuple> implements 
Comparator<T> {
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public int compare(T o1, T o2) {
-                       int a1 = o1.getArity();
-                       int a2 = o2.getArity();
-                       
-                       if (a1 < a2) {
-                               return -1;
-                       } else if (a2 < a1) {
-                               return 1;
-                       } else {
-                               for (int i = 0; i < a1; i++) {
-                                       Object obj1 = o1.getField(i);
-                                       Object obj2 = o2.getField(i);
-                                       
-                                       if (!(obj1 instanceof Comparable && 
obj2 instanceof Comparable)) {
-                                               Assert.fail("Cannot compare 
tuple fields");
-                                       }
-                                       
-                                       int cmp = ((Comparable<Object>) 
obj1).compareTo(obj2);
-                                       if (cmp != 0) {
-                                               return cmp;
-                                       }
-                               }
-                               
-                               return 0;
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
deleted file mode 100644
index d7f09bd..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Base class for unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup 
and
- * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the ExecutionEnvironment from
- * the context:
- *
- * <pre>{@code
- *
- *   {@literal @}Test
- *   public void someTest() {
- *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   {@literal @}Test
- *   public void anotherTest() {
- *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * }</pre>
- */
-public class MultipleProgramsTestBase extends TestBaseUtils {
-
-       /**
-        * Enum that defines which execution environment to run the next test 
on:
-        * An embedded local flink cluster, or the collection execution backend.
-        */
-       public enum TestExecutionMode {
-               CLUSTER,
-               COLLECTION
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  The mini cluster that is shared across tests
-       // 
------------------------------------------------------------------------
-
-       protected static final int DEFAULT_PARALLELISM = 4;
-
-       protected static boolean startWebServer = false;
-
-       protected static ForkableFlinkMiniCluster cluster = null;
-       
-       // 
------------------------------------------------------------------------
-       
-       protected final TestExecutionMode mode;
-
-       
-       public MultipleProgramsTestBase(TestExecutionMode mode) {
-               this.mode = mode;
-               
-               switch(mode){
-                       case CLUSTER:
-                               TestEnvironment clusterEnv = new 
TestEnvironment(cluster, 4);
-                               clusterEnv.setAsContext();
-                               break;
-                       case COLLECTION:
-                               CollectionTestEnvironment collectionEnv = new 
CollectionTestEnvironment();
-                               collectionEnv.setAsContext();
-                               break;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Cluster setup & teardown
-       // 
------------------------------------------------------------------------
-
-       @BeforeClass
-       public static void setup() throws Exception {
-               cluster = TestBaseUtils.startCluster(
-                       1,
-                       DEFAULT_PARALLELISM,
-                       startWebServer,
-                       false,
-                       true);
-       }
-
-       @AfterClass
-       public static void teardown() throws Exception {
-               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Parametrization lets the tests run in cluster and collection mode
-       // 
------------------------------------------------------------------------
-       
-       @Parameterized.Parameters(name = "Execution mode = {0}")
-       public static Collection<Object[]> executionModes() {
-               return Arrays.asList(
-                               new Object[] { TestExecutionMode.CLUSTER },
-                               new Object[] { TestExecutionMode.COLLECTION });
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
deleted file mode 100644
index 4dda4cf..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ /dev/null
@@ -1,666 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Field;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class TestBaseUtils extends TestLogger {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(TestBaseUtils.class);
-
-       protected static final int MINIMUM_HEAP_SIZE_MB = 192;
-
-       protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
-
-       protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
-
-       protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
-
-       protected static FiniteDuration DEFAULT_TIMEOUT = new 
FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
-
-       // 
------------------------------------------------------------------------
-       
-       protected static File logDir;
-
-       protected TestBaseUtils(){
-               verifyJvmOptions();
-       }
-       
-       private static void verifyJvmOptions() {
-               long heap = Runtime.getRuntime().maxMemory() >> 20;
-               Assert.assertTrue("Insufficient java heap space " + heap + "mb 
- set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
-                               + "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
-       }
-       
-       
-       public static ForkableFlinkMiniCluster startCluster(
-               int numTaskManagers,
-               int taskManagerNumSlots,
-               boolean startWebserver,
-               boolean startZooKeeper,
-               boolean singleActorSystem) throws Exception {
-               
-               Configuration config = new Configuration();
-
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
taskManagerNumSlots);
-               
-               config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, 
startWebserver);
-
-               if (startZooKeeper) {
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-                       config.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
-               }
-
-               return startCluster(config, singleActorSystem);
-       }
-
-       public static ForkableFlinkMiniCluster startCluster(
-               Configuration config,
-               boolean singleActorSystem) throws Exception {
-
-               logDir = File.createTempFile("TestBaseUtils-logdir", null);
-               Assert.assertTrue("Unable to delete temp file", 
logDir.delete());
-               Assert.assertTrue("Unable to create temp directory", 
logDir.mkdir());
-               Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
-               Files.createFile(new File(logDir, "jobmanager.out").toPath());
-
-               config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
TASK_MANAGER_MEMORY_SIZE);
-               
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
-
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
-               config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
-
-               config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
8081);
-               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
-               
-               config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.toString());
-
-               ForkableFlinkMiniCluster cluster =  new 
ForkableFlinkMiniCluster(config, singleActorSystem);
-
-               cluster.start();
-
-               return cluster;
-       }
-
-
-       public static void stopCluster(ForkableFlinkMiniCluster executor, 
FiniteDuration timeout) throws Exception {
-               if (logDir != null) {
-                       FileUtils.deleteDirectory(logDir);
-               }
-               if (executor != null) {
-                       int numUnreleasedBCVars = 0;
-                       int numActiveConnections = 0;
-                       
-                       if (executor.running()) {
-                               List<ActorRef> tms = 
executor.getTaskManagersAsJava();
-                               List<Future<Object>> 
bcVariableManagerResponseFutures = new ArrayList<>();
-                               List<Future<Object>> 
numActiveConnectionsResponseFutures = new ArrayList<>();
-
-                               for (ActorRef tm : tms) {
-                                       
bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
-                                                       
.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
-
-                                       
numActiveConnectionsResponseFutures.add(Patterns.ask(tm, 
TestingTaskManagerMessages
-                                                       
.RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
-                               }
-
-                               Future<Iterable<Object>> 
bcVariableManagerFutureResponses = Futures.sequence(
-                                               
bcVariableManagerResponseFutures, TestingUtils.defaultExecutionContext());
-
-                               Iterable<Object> responses = 
Await.result(bcVariableManagerFutureResponses, timeout);
-
-                               for (Object response : responses) {
-                                       numUnreleasedBCVars += 
((TestingTaskManagerMessages
-                                                       
.ResponseBroadcastVariablesWithReferences) response).number();
-                               }
-
-                               Future<Iterable<Object>> 
numActiveConnectionsFutureResponses = Futures.sequence(
-                                               
numActiveConnectionsResponseFutures, TestingUtils.defaultExecutionContext());
-
-                               responses = 
Await.result(numActiveConnectionsFutureResponses, timeout);
-
-                               for (Object response : responses) {
-                                       numActiveConnections += 
((TestingTaskManagerMessages
-                                                       
.ResponseNumActiveConnections) response).number();
-                               }
-                       }
-
-                       executor.stop();
-                       FileSystem.closeAll();
-                       System.gc();
-
-                       Assert.assertEquals("Not all broadcast variables were 
released.", 0, numUnreleasedBCVars);
-                       Assert.assertEquals("Not all TCP connections were 
released.", 0, numActiveConnections);
-               }
-
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Result Checking
-       // 
--------------------------------------------------------------------------------------------
-
-       public static BufferedReader[] getResultReader(String resultPath) 
throws IOException {
-               return getResultReader(resultPath, new String[]{}, false);
-       }
-
-       public static BufferedReader[] getResultReader(String resultPath, 
String[] excludePrefixes,
-                                                                               
        boolean inOrderOfFiles) throws IOException {
-               File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
-
-               if (inOrderOfFiles) {
-                       // sort the files after their name (1, 2, 3, 4)...
-                       // we cannot sort by path, because strings sort by 
prefix
-                       Arrays.sort(files, new Comparator<File>() {
-
-                               @Override
-                               public int compare(File o1, File o2) {
-                                       try {
-                                               int f1 = 
Integer.parseInt(o1.getName());
-                                               int f2 = 
Integer.parseInt(o2.getName());
-                                               return f1 < f2 ? -1 : (f1 > f2 
? 1 : 0);
-                                       }
-                                       catch (NumberFormatException e) {
-                                               throw new RuntimeException("The 
file names are no numbers and cannot be ordered: " +
-                                                               o1.getName() + 
"/" + o2.getName());
-                                       }
-                               }
-                       });
-               }
-
-               BufferedReader[] readers = new BufferedReader[files.length];
-               for (int i = 0; i < files.length; i++) {
-                       readers[i] = new BufferedReader(new 
FileReader(files[i]));
-               }
-               return readers;
-       }
-       
-       public static BufferedInputStream[] getResultInputStream(String 
resultPath) throws IOException {
-               return getResultInputStream(resultPath, new String[]{});
-       }
-
-       public static BufferedInputStream[] getResultInputStream(String 
resultPath, String[]
-                       excludePrefixes) throws IOException {
-               File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
-               BufferedInputStream[] inStreams = new 
BufferedInputStream[files.length];
-               for (int i = 0; i < files.length; i++) {
-                       inStreams[i] = new BufferedInputStream(new 
FileInputStream(files[i]));
-               }
-               return inStreams;
-       }
-
-       public static void readAllResultLines(List<String> target, String 
resultPath) throws IOException {
-               readAllResultLines(target, resultPath, new String[]{});
-       }
-
-       public static void readAllResultLines(List<String> target, String 
resultPath, String[] excludePrefixes) 
-                       throws IOException {
-               
-               readAllResultLines(target, resultPath, excludePrefixes, false);
-       }
-
-       public static void readAllResultLines(List<String> target, String 
resultPath, 
-                                                                               
        String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
-
-               final BufferedReader[] readers = getResultReader(resultPath, 
excludePrefixes, inOrderOfFiles);
-               try {
-                       for (BufferedReader reader : readers) {
-                               String s;
-                               while ((s = reader.readLine()) != null) {
-                                       target.add(s);
-                               }
-                       }
-               }
-               finally {
-                       for (BufferedReader reader : readers) {
-                               try {
-                                       reader.close();
-                               }
-                               catch (Exception e) {
-                                       // ignore, this is best-effort cleanup
-                               }
-                       }
-               }
-       }
-
-       public static void compareResultsByLinesInMemory(String 
expectedResultStr, String resultPath) throws Exception {
-               compareResultsByLinesInMemory(expectedResultStr, resultPath, 
new String[0]);
-       }
-
-       public static void compareResultsByLinesInMemory(String 
expectedResultStr, String resultPath,
-                                                                               
        String[] excludePrefixes) throws Exception {
-               ArrayList<String> list = new ArrayList<>();
-               readAllResultLines(list, resultPath, excludePrefixes, false);
-
-               String[] result = list.toArray(new String[list.size()]);
-               Arrays.sort(result);
-
-               String[] expected = expectedResultStr.isEmpty() ? new String[0] 
: expectedResultStr.split("\n");
-               Arrays.sort(expected);
-
-               Assert.assertEquals("Different number of lines in expected and 
obtained result.", expected.length, result.length);
-               Assert.assertArrayEquals(expected, result);
-       }
-
-       public static void compareResultsByLinesInMemoryWithStrictOrder(String 
expectedResultStr,
-                                                                               
                                                        String resultPath) 
throws Exception {
-               compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, 
resultPath, new String[]{});
-       }
-
-       public static void compareResultsByLinesInMemoryWithStrictOrder(String 
expectedResultStr,
-                                                                               
                                                        String resultPath, 
String[] excludePrefixes) throws Exception {
-               ArrayList<String> list = new ArrayList<>();
-               readAllResultLines(list, resultPath, excludePrefixes, true);
-
-               String[] result = list.toArray(new String[list.size()]);
-
-               String[] expected = expectedResultStr.split("\n");
-
-               Assert.assertEquals("Different number of lines in expected and 
obtained result.", expected.length, result.length);
-               Assert.assertArrayEquals(expected, result);
-       }
-
-       public static void checkLinesAgainstRegexp(String resultPath, String 
regexp){
-               Pattern pattern = Pattern.compile(regexp);
-               Matcher matcher = pattern.matcher("");
-
-               ArrayList<String> list = new ArrayList<>();
-               try {
-                       readAllResultLines(list, resultPath, new String[]{}, 
false);
-               } catch (IOException e1) {
-                       Assert.fail("Error reading the result");
-               }
-
-               for (String line : list){
-                       matcher.reset(line);
-                       if (!matcher.find()){
-                               String msg = "Line is not well-formed: " + line;
-                               Assert.fail(msg);
-                       }
-               }
-       }
-
-       public static void compareKeyValuePairsWithDelta(String expectedLines, 
String resultPath,
-                                                                               
                                String delimiter, double maxDelta) throws 
Exception {
-               compareKeyValuePairsWithDelta(expectedLines, resultPath, new 
String[]{}, delimiter, maxDelta);
-       }
-
-       public static void compareKeyValuePairsWithDelta(String expectedLines, 
String resultPath,
-                                                                               
                                String[] excludePrefixes, String delimiter, 
double maxDelta) throws Exception {
-               ArrayList<String> list = new ArrayList<>();
-               readAllResultLines(list, resultPath, excludePrefixes, false);
-
-               String[] result = list.toArray(new String[list.size()]);
-               String[] expected = expectedLines.isEmpty() ? new String[0] : 
expectedLines.split("\n");
-
-               Assert.assertEquals("Wrong number of result lines.", 
expected.length, result.length);
-
-               Arrays.sort(result);
-               Arrays.sort(expected);
-
-               for (int i = 0; i < expected.length; i++) {
-                       String[] expectedFields = expected[i].split(delimiter);
-                       String[] resultFields = result[i].split(delimiter);
-
-                       double expectedPayLoad = 
Double.parseDouble(expectedFields[1]);
-                       double resultPayLoad = 
Double.parseDouble(resultFields[1]);
-
-                       Assert.assertTrue("Values differ by more than the 
permissible delta", Math.abs(expectedPayLoad - resultPayLoad) < maxDelta);
-               }
-       }
-
-       public static <X> void compareResultCollections(List<X> expected, 
List<X> actual,
-                                                                               
        Comparator<X> comparator) {
-               Assert.assertEquals(expected.size(), actual.size());
-
-               Collections.sort(expected, comparator);
-               Collections.sort(actual, comparator);
-
-               for (int i = 0; i < expected.size(); i++) {
-                       Assert.assertEquals(expected.get(i), actual.get(i));
-               }
-       }
-
-       private static File[] getAllInvolvedFiles(String resultPath, String[] 
excludePrefixes) {
-               final String[] exPrefs = excludePrefixes;
-               File result = asFile(resultPath);
-               if (!result.exists()) {
-                       Assert.fail("Result file was not written");
-               }
-               if (result.isDirectory()) {
-                       return result.listFiles(new FilenameFilter() {
-
-                               @Override
-                               public boolean accept(File dir, String name) {
-                                       for(String p: exPrefs) {
-                                               if(name.startsWith(p)) {
-                                                       return false;
-                                               }
-                                       }
-                                       return true;
-                               }
-                       });
-               } else {
-                       return new File[] { result };
-               }
-       }
-
-       protected static File asFile(String path) {
-               try {
-                       URI uri = new URI(path);
-                       if (uri.getScheme().equals("file")) {
-                               return new File(uri.getPath());
-                       } else {
-                               throw new IllegalArgumentException("This path 
does not denote a local file.");
-                       }
-               } catch (URISyntaxException | NullPointerException e) {
-                       throw new IllegalArgumentException("This path does not 
describe a valid local file URI.");
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Comparison methods for tests using collect()
-       // 
--------------------------------------------------------------------------------------------
-
-       public static <T> void compareResultAsTuples(List<T> result, String 
expected) {
-               compareResult(result, expected, true, true);
-       }
-
-       public static <T> void compareResultAsText(List<T> result, String 
expected) {
-               compareResult(result, expected,
-                               false, true);
-       }
-
-       public static <T> void compareOrderedResultAsText(List<T> result, 
String expected) {
-               compareResult(result, expected, false, false);
-       }
-
-       public static <T> void compareOrderedResultAsText(List<T> result, 
String expected, boolean asTuples) {
-               compareResult(result, expected, asTuples, false);
-       }
-       
-       private static <T> void compareResult(List<T> result, String expected, 
boolean asTuples, boolean sort) {
-               String[] expectedStrings = expected.split("\n");
-               String[] resultStrings = new String[result.size()];
-               
-               for (int i = 0; i < resultStrings.length; i++) {
-                       T val = result.get(i);
-                       
-                       if (asTuples) {
-                               if (val instanceof Tuple) {
-                                       Tuple t = (Tuple) val;
-                                       Object first = t.getField(0);
-                                       StringBuilder bld = new 
StringBuilder(first == null ? "null" : first.toString());
-                                       for (int pos = 1; pos < t.getArity(); 
pos++) {
-                                               Object next = t.getField(pos);
-                                               bld.append(',').append(next == 
null ? "null" : next.toString());
-                                       }
-                                       resultStrings[i] = bld.toString();
-                               }
-                               else {
-                                       throw new IllegalArgumentException(val 
+ " is no tuple");
-                               }
-                       }
-                       else {
-                               resultStrings[i] = (val == null) ? "null" : 
val.toString();
-                       }
-               }
-               
-               assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
-
-               if (sort) {
-                       Arrays.sort(expectedStrings);
-                       Arrays.sort(resultStrings);
-               }
-               
-               for (int i = 0; i < expectedStrings.length; i++) {
-                       assertEquals(expectedStrings[i], resultStrings[i]);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // Comparison methods for tests using sample
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * The expected string contains all expected results separate with line 
break, check whether all elements in result
-        * are contained in the expected string.
-        * @param result The test result.
-        * @param expected The expected string value combination.
-        * @param <T> The result type.
-        */
-       public static <T> void containsResultAsText(List<T> result, String 
expected) {
-               String[] expectedStrings = expected.split("\n");
-               List<String> resultStrings = Lists.newLinkedList();
-
-               for (int i = 0; i < result.size(); i++) {
-                       T val = result.get(i);
-                       String str = (val == null) ? "null" : val.toString();
-                       resultStrings.add(str);
-               }
-
-               List<String> expectedStringList = 
Arrays.asList(expectedStrings);
-
-               for (String element : resultStrings) {
-                       assertTrue(expectedStringList.contains(element));
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Miscellaneous helper methods
-       // 
--------------------------------------------------------------------------------------------
-
-       protected static Collection<Object[]> toParameterList(Configuration ... 
testConfigs) {
-               ArrayList<Object[]> configs = new ArrayList<>();
-               for (Configuration testConfig : testConfigs) {
-                       Object[] c = { testConfig };
-                       configs.add(c);
-               }
-               return configs;
-       }
-
-       protected static Collection<Object[]> 
toParameterList(List<Configuration> testConfigs) {
-               LinkedList<Object[]> configs = new LinkedList<>();
-               for (Configuration testConfig : testConfigs) {
-                       Object[] c = { testConfig };
-                       configs.add(c);
-               }
-               return configs;
-       }
-
-       // This code is taken from: http://stackoverflow.com/a/7201825/568695
-       // it changes the environment variables of this JVM. Use only for 
testing purposes!
-       @SuppressWarnings("unchecked")
-       public static void setEnv(Map<String, String> newenv) {
-               try {
-                       Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
-                       Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
-                       theEnvironmentField.setAccessible(true);
-                       Map<String, String> env = (Map<String, String>) 
theEnvironmentField.get(null);
-                       env.putAll(newenv);
-                       Field theCaseInsensitiveEnvironmentField = 
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
-                       theCaseInsensitiveEnvironmentField.setAccessible(true);
-                       Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
-                       cienv.putAll(newenv);
-               } catch (NoSuchFieldException e) {
-                       try {
-                               Class<?>[] classes = 
Collections.class.getDeclaredClasses();
-                               Map<String, String> env = System.getenv();
-                               for (Class<?> cl : classes) {
-                                       if 
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
-                                               Field field = 
cl.getDeclaredField("m");
-                                               field.setAccessible(true);
-                                               Object obj = field.get(env);
-                                               Map<String, String> map = 
(Map<String, String>) obj;
-                                               map.clear();
-                                               map.putAll(newenv);
-                                       }
-                               }
-                       } catch (Exception e2) {
-                               throw new RuntimeException(e2);
-                       }
-               } catch (Exception e1) {
-                       throw new RuntimeException(e1);
-               }
-       }
-       // 
--------------------------------------------------------------------------------------------
-       //  File helper methods
-       // 
--------------------------------------------------------------------------------------------
-
-       protected static void deleteRecursively(File f) throws IOException {
-               if (f.isDirectory()) {
-                       FileUtils.deleteDirectory(f);
-               } else if (!f.delete()) {
-                       System.err.println("Failed to delete file " + 
f.getAbsolutePath());
-               }
-       }
-       
-       public static String constructTestPath(Class<?> forClass, String 
folder) {
-               // we create test path that depends on class to prevent name 
clashes when two tests
-               // create temp files with the same name
-               String path = System.getProperty("java.io.tmpdir");
-               if (!(path.endsWith("/") || path.endsWith("\\")) ) {
-                       path += System.getProperty("file.separator");
-               }
-               path += (forClass.getName() + "-" + folder);
-               return path;
-       }
-       
-       public static String constructTestURI(Class<?> forClass, String folder) 
{
-               return new File(constructTestPath(forClass, 
folder)).toURI().toString();
-       }
-
-       
//---------------------------------------------------------------------------------------------
-       // Web utils
-       
//---------------------------------------------------------------------------------------------
-
-       public static String getFromHTTP(String url) throws Exception {
-               URL u = new URL(url);
-               LOG.info("Accessing URL "+url+" as URL: "+u);
-               HttpURLConnection connection = (HttpURLConnection) 
u.openConnection();
-               connection.setConnectTimeout(100000);
-               connection.connect();
-               InputStream is;
-               if(connection.getResponseCode() >= 400) {
-                       // error!
-                       LOG.warn("HTTP Response code when connecting to {} was 
{}", url, connection.getResponseCode());
-                       is = connection.getErrorStream();
-               } else {
-                       is = connection.getInputStream();
-               }
-
-               return IOUtils.toString(is, connection.getContentEncoding() != 
null ? connection.getContentEncoding() : "UTF-8");
-       }
-       
-       public static class TupleComparator<T extends Tuple> implements 
Comparator<T> {
-
-               @Override
-               public int compare(T o1, T o2) {
-                       if (o1 == null || o2 == null) {
-                               throw new IllegalArgumentException("Cannot 
compare null tuples");
-                       }
-                       else if (o1.getArity() != o2.getArity()) {
-                               return o1.getArity() - o2.getArity();
-                       }
-                       else {
-                               for (int i = 0; i < o1.getArity(); i++) {
-                                       Object val1 = o1.getField(i);
-                                       Object val2 = o2.getField(i);
-                                       
-                                       int cmp;
-                                       if (val1 != null && val2 != null) {
-                                               cmp = compareValues(val1, val2);
-                                       }
-                                       else {
-                                               cmp = val1 == null ? (val2 == 
null ? 0 : -1) : 1;
-                                       }
-                                       
-                                       if (cmp != 0) {
-                                               return cmp;
-                                       }
-                               }
-                               
-                               return 0;
-                       }
-               }
-               
-               @SuppressWarnings("unchecked")
-               private static <X extends Comparable<X>> int 
compareValues(Object o1, Object o2) {
-                       if (o1 instanceof Comparable && o2 instanceof 
Comparable) {
-                               X c1 = (X) o1;
-                               X c2 = (X) o2;
-                               return c1.compareTo(c2);
-                       }
-                       else {
-                               throw new IllegalArgumentException("Cannot 
compare tuples with non comparable elements");
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
deleted file mode 100644
index 7cb88be..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.CodeAnalysisMode;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-public class TestEnvironment extends ExecutionEnvironment {
-
-       private final ForkableFlinkMiniCluster executor;
-
-       private TestEnvironment lastEnv = null;
-
-       @Override
-       public JobExecutionResult getLastJobExecutionResult() {
-               if (lastEnv == null) {
-                       return this.lastJobExecutionResult;
-               }
-               else {
-                       return lastEnv.getLastJobExecutionResult();
-               }
-       }
-
-       public TestEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism) {
-               this.executor = executor;
-               setParallelism(parallelism);
-
-               // disabled to improve build time
-               getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
-       }
-
-       public TestEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism, boolean isObjectReuseEnabled) {
-               this(executor, parallelism);
-
-               if (isObjectReuseEnabled) {
-                       getConfig().enableObjectReuse();
-               } else {
-                       getConfig().disableObjectReuse();
-               }
-       }
-
-       @Override
-       public void startNewSession() throws Exception {
-       }
-
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               OptimizedPlan op = compileProgram(jobName);
-
-               JobGraphGenerator jgg = new JobGraphGenerator();
-               JobGraph jobGraph = jgg.compileJobGraph(op);
-
-               this.lastJobExecutionResult = 
executor.submitJobAndWait(jobGraph, false);
-               return this.lastJobExecutionResult;
-       }
-
-
-       @Override
-       public String getExecutionPlan() throws Exception {
-               OptimizedPlan op = compileProgram("unused");
-
-               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-               return jsonGen.getOptimizerPlanAsJSON(op);
-       }
-
-
-       private OptimizedPlan compileProgram(String jobName) {
-               Plan p = createProgramPlan(jobName);
-
-               Optimizer pc = new Optimizer(new DataStatistics(), 
this.executor.configuration());
-               return pc.compile(p);
-       }
-
-       public void setAsContext() {
-               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                       @Override
-                       public ExecutionEnvironment 
createExecutionEnvironment() {
-                               lastEnv = new TestEnvironment(executor, 
getParallelism(), getConfig().isObjectReuseEnabled());
-                               return lastEnv;
-                       }
-               };
-
-               initializeContextEnvironment(factory);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
deleted file mode 100644
index 715bc55..0000000
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util
-
-import org.scalatest.{Suite, BeforeAndAfter}
-
-/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala 
based tests.
-  * Additionally a TestEnvironment with the started cluster is created and set 
as the default
-  * [[org.apache.flink.api.java.ExecutionEnvironment]].
-  *
-  * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a 
number of slots given
-  * by parallelism. This value can be overridden in a sub class in order to 
start the cluster
-  * with a different number of slots.
-  *
-  * The cluster is started once before starting the tests and is re-used for 
the individual tests.
-  * After all tests have been executed, the cluster is shutdown.
-  *
-  * The cluster is used by obtaining the default 
[[org.apache.flink.api.java.ExecutionEnvironment]].
-  *
-  * @example
-  *          {{{
-  *            def testSomething: Unit = {
-  *             // Obtain TestEnvironment with started ForkableFlinkMiniCluster
-  *             val env = ExecutionEnvironment.getExecutionEnvironment
-  *
-  *             env.fromCollection(...)
-  *
-  *             env.execute
-  *            }
-  *          }}}
-  *
-  */
-trait FlinkTestBase extends BeforeAndAfter {
-  that: Suite =>
-
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
-
-  before {
-    val cl = TestBaseUtils.startCluster(
-      1,
-      parallelism,
-      false,
-      false,
-      true)
-
-    val clusterEnvironment = new TestEnvironment(cl, parallelism)
-    clusterEnvironment.setAsContext()
-
-    cluster = Some(cl)
-  }
-
-  after {
-    cluster.map(c => TestBaseUtils.stopCluster(c, 
TestBaseUtils.DEFAULT_TIMEOUT))
-  }
-
-}

Reply via email to