http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
deleted file mode 100644
index c32f5da..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class StringArrayWritable implements Writable, 
Comparable<StringArrayWritable> {
-       
-       private String[] array = new String[0];
-       
-       public StringArrayWritable() {
-               super();
-       }
-       
-       public StringArrayWritable(String[] array) {
-               this.array = array;
-       }
-       
-       @Override
-       public void write(DataOutput out) throws IOException {
-               out.writeInt(this.array.length);
-               
-               for(String str : this.array) {
-                       byte[] b = str.getBytes();
-                       out.writeInt(b.length);
-                       out.write(b);
-               }
-       }
-       
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               this.array = new String[in.readInt()];
-               
-               for(int i = 0; i < this.array.length; i++) {
-                       byte[] b = new byte[in.readInt()];
-                       in.readFully(b);
-                       this.array[i] = new String(b);
-               }
-       }
-       
-       @Override
-       public int compareTo(StringArrayWritable o) {
-               if(this.array.length != o.array.length) {
-                       return this.array.length - o.array.length;
-               }
-               
-               for(int i = 0; i < this.array.length; i++) {
-                       int comp = this.array[i].compareTo(o.array[i]);
-                       if(comp != 0) {
-                               return comp;
-                       }
-               }
-               return 0;
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if(!(obj instanceof StringArrayWritable)) {
-                       return false;
-               }
-               return this.compareTo((StringArrayWritable) obj) == 0;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
deleted file mode 100644
index 96f844c..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class WritableComparatorTest extends 
ComparatorTestBase<StringArrayWritable> {
-       
-       StringArrayWritable[] data = new StringArrayWritable[]{
-                       new StringArrayWritable(new String[]{}),
-                       new StringArrayWritable(new String[]{""}),
-                       new StringArrayWritable(new String[]{"a","a"}),
-                       new StringArrayWritable(new String[]{"a","b"}),
-                       new StringArrayWritable(new String[]{"c","c"}),
-                       new StringArrayWritable(new String[]{"d","f"}),
-                       new StringArrayWritable(new String[]{"d","m"}),
-                       new StringArrayWritable(new String[]{"z","x"}),
-                       new StringArrayWritable(new String[]{"a","a", "a"})
-       };
-       
-       @Override
-       protected TypeComparator<StringArrayWritable> createComparator(boolean 
ascending) {
-               return new WritableComparator<StringArrayWritable>(ascending, 
StringArrayWritable.class);
-       }
-       
-       @Override
-       protected TypeSerializer<StringArrayWritable> createSerializer() {
-               return new 
WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
-       }
-       
-       @Override
-       protected StringArrayWritable[] getSortedTestData() {
-               return data;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
deleted file mode 100644
index 94e759d..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> 
{
-       @Override
-       protected TypeComparator<WritableID> createComparator(boolean 
ascending) {
-               return new WritableComparator<>(ascending, WritableID.class);
-       }
-
-       @Override
-       protected TypeSerializer<WritableID> createSerializer() {
-               return new WritableSerializer<>(WritableID.class);
-       }
-
-       @Override
-       protected WritableID[] getSortedTestData() {
-               return new WritableID[] {
-                       new WritableID(new UUID(0, 0)),
-                       new WritableID(new UUID(1, 0)),
-                       new WritableID(new UUID(1, 1))
-               };
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
deleted file mode 100644
index 4274cf6..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.UUID;
-
-public class WritableID implements WritableComparable<WritableID> {
-       private UUID uuid;
-
-       public WritableID() {
-               this.uuid = UUID.randomUUID();
-       }
-
-       public WritableID(UUID uuid) {
-               this.uuid = uuid;
-       }
-
-       @Override
-       public int compareTo(WritableID o) {
-               return this.uuid.compareTo(o.uuid);
-       }
-
-       @Override
-       public void write(DataOutput dataOutput) throws IOException {
-               dataOutput.writeLong(uuid.getMostSignificantBits());
-               dataOutput.writeLong(uuid.getLeastSignificantBits());
-       }
-
-       @Override
-       public void readFields(DataInput dataInput) throws IOException {
-               this.uuid = new UUID(dataInput.readLong(), 
dataInput.readLong());
-       }
-
-       @Override
-       public String toString() {
-               return uuid.toString();
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               WritableID id = (WritableID) o;
-
-               return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != 
null);
-       }
-
-       @Override
-       public int hashCode() {
-               return uuid != null ? uuid.hashCode() : 0;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
deleted file mode 100644
index bb5f4d4..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.junit.Test;
-
-public class WritableSerializerTest {
-       
-       @Test
-       public void testStringArrayWritable() {
-               StringArrayWritable[] data = new StringArrayWritable[]{
-                               new StringArrayWritable(new String[]{}),
-                               new StringArrayWritable(new String[]{""}),
-                               new StringArrayWritable(new String[]{"a","a"}),
-                               new StringArrayWritable(new String[]{"a","b"}),
-                               new StringArrayWritable(new String[]{"c","c"}),
-                               new StringArrayWritable(new String[]{"d","f"}),
-                               new StringArrayWritable(new String[]{"d","m"}),
-                               new StringArrayWritable(new String[]{"z","x"}),
-                               new StringArrayWritable(new String[]{"a","a", 
"a"})
-               };
-               
-               WritableTypeInfo<StringArrayWritable> writableTypeInfo = 
(WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
-               WritableSerializer<StringArrayWritable> writableSerializer = 
(WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new 
ExecutionConfig());
-               
-               SerializerTestInstance<StringArrayWritable> testInstance = new 
SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(),
 -1, data);
-               
-               testInstance.testAll();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
deleted file mode 100644
index 2af7730..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> 
{
-       @Override
-       protected TypeSerializer<WritableID> createSerializer() {
-               return new WritableSerializer<>(WritableID.class);
-       }
-
-       @Override
-       protected int getLength() {
-               return -1;
-       }
-
-       @Override
-       protected Class<WritableID> getTypeClass() {
-               return WritableID.class;
-       }
-
-       @Override
-       protected WritableID[] getTestData() {
-               return new WritableID[] {
-                       new WritableID(new UUID(0, 0)),
-                       new WritableID(new UUID(1, 0)),
-                       new WritableID(new UUID(1, 1))
-               };
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
deleted file mode 100644
index 6f7673b..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility;
-
-import org.apache.flink.api.java.utils.AbstractParameterToolTest;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class HadoopUtilsTest extends AbstractParameterToolTest {
-
-       @Test
-       public void testParamsFromGenericOptionsParser() throws IOException {
-               ParameterTool parameter = 
HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", 
"-DexpectedCount=15"});
-               validate(parameter);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
deleted file mode 100644
index 4d1acb4..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
-
-       public HadoopMapFunctionITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Test
-       public void testNonPassingMapper() throws Exception{
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
-               DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
-                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new NonPassingMapper()));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               nonPassingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-               env.execute();
-
-               compareResultsByLinesInMemory("\n", resultPath);
-       }
-
-       @Test
-       public void testDataDuplicatingMapper() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
-               DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
-                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new DuplicatingMapper()));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               duplicatingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-               env.execute();
-
-               String expected = "(1,Hi)\n" + "(1,HI)\n" +
-                               "(2,Hello)\n" + "(2,HELLO)\n" +
-                               "(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
-                               "(4,Hello world, how are you?)\n" + "(4,HELLO 
WORLD, HOW ARE YOU?)\n" +
-                               "(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
-                               "(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" 
+
-                               "(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
-                               "(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
-                               "(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
-                               "(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
-                               "(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
-                               "(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
-                               "(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
-                               "(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
-                               "(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
-                               "(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
-                               "(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
-                               "(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
-                               "(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
-                               "(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
-                               "(21,Comment#15)\n" + "(21,COMMENT#15)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testConfigurableMapper() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               JobConf conf = new JobConf();
-               conf.set("my.filterPrefix", "Hello");
-
-               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
-               DataSet<Tuple2<IntWritable, Text>> hellos = ds.
-                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new ConfigurableMapper(), conf));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-               env.execute();
-
-               String expected = "(2,Hello)\n" +
-                               "(3,Hello world)\n" +
-                               "(4,Hello world, how are you?)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-       
-
-       
-       public static class NonPassingMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
-               
-               @Override
-               public void map(final IntWritable k, final Text v, 
-                               final OutputCollector<IntWritable, Text> out, 
final Reporter r) throws IOException {
-                       if ( v.toString().contains("bananas") ) {
-                               out.collect(k,v);
-                       }
-               }
-               
-               @Override
-               public void configure(final JobConf arg0) { }
-
-               @Override
-               public void close() throws IOException { }
-       }
-       
-       public static class DuplicatingMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
-               
-               @Override
-               public void map(final IntWritable k, final Text v, 
-                               final OutputCollector<IntWritable, Text> out, 
final Reporter r) throws IOException {
-                       out.collect(k, v);
-                       out.collect(k, new Text(v.toString().toUpperCase()));
-               }
-               
-               @Override
-               public void configure(final JobConf arg0) { }
-
-               @Override
-               public void close() throws IOException { }
-       }
-       
-       public static class ConfigurableMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
-               private String filterPrefix;
-               
-               @Override
-               public void map(IntWritable k, Text v, 
OutputCollector<IntWritable, Text> out, Reporter r)
-                               throws IOException {
-                       if(v.toString().startsWith(filterPrefix)) {
-                               out.collect(k, v);
-                       }
-               }
-               
-               @Override
-               public void configure(JobConf c) {
-                       filterPrefix = c.get("my.filterPrefix");
-               }
-
-               @Override
-               public void close() throws IOException { }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
deleted file mode 100644
index ccc0d82..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import 
org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopMapredITCase extends JavaProgramTestBase {
-       
-       protected String textPath;
-       protected String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               textPath = createTempFile("text.txt", WordCountData.TEXT);
-               resultPath = getTempDirPath("result");
-               this.setParallelism(4);
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[]{".", "_"});
-       }
-       
-       @Override
-       protected void testProgram() throws Exception {
-               HadoopMapredCompatWordCount.main(new String[] { textPath, 
resultPath });
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
deleted file mode 100644
index 13d971c..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.hamcrest.core.IsEqual;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBase {
-
-       public HadoopReduceCombineFunctionITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Test
-       public void testStandardCountingWithCombiner() throws Exception{
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
-                               map(new Mapper1());
-
-               DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
-                               groupBy(0).
-                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
-                                               new SumReducer(), new 
SumReducer()));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               counts.writeAsText(resultPath);
-               env.execute();
-
-               String expected = "(0,5)\n"+
-                               "(1,6)\n" +
-                               "(2,6)\n" +
-                               "(3,4)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testUngroupedHadoopReducer() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
-                               map(new Mapper2());
-
-               DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
-                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
-                                               new SumReducer(), new 
SumReducer()));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               sum.writeAsText(resultPath);
-               env.execute();
-
-               String expected = "(0,231)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testCombiner() throws Exception {
-               org.junit.Assume.assumeThat(mode, new 
IsEqual<TestExecutionMode>(TestExecutionMode.CLUSTER));
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
-                               map(new Mapper3());
-
-               DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
-                               groupBy(0).
-                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
-                                               new SumReducer(), new 
KeyChangingReducer()));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               counts.writeAsText(resultPath);
-               env.execute();
-
-               String expected = "(0,5)\n"+
-                               "(1,6)\n" +
-                               "(2,5)\n" +
-                               "(3,5)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testConfigurationViaJobConf() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               JobConf conf = new JobConf();
-               conf.set("my.cntPrefix", "Hello");
-
-               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
-                               map(new Mapper4());
-
-               DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
-                               groupBy(0).
-                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
-                                               new ConfigurableCntReducer(), 
conf));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               hellos.writeAsText(resultPath);
-               env.execute();
-
-               // return expected result
-               String expected = "(0,0)\n"+
-                               "(1,0)\n" +
-                               "(2,1)\n" +
-                               "(3,1)\n" +
-                               "(4,1)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-       
-       public static class SumReducer implements Reducer<IntWritable, 
IntWritable, IntWritable, IntWritable> {
-
-               @Override
-               public void reduce(IntWritable k, Iterator<IntWritable> v, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
-                               throws IOException {
-                       
-                       int sum = 0;
-                       while(v.hasNext()) {
-                               sum += v.next().get();
-                       }
-                       out.collect(k, new IntWritable(sum));
-               }
-               
-               @Override
-               public void configure(JobConf arg0) { }
-
-               @Override
-               public void close() throws IOException { }
-       }
-       
-       public static class KeyChangingReducer implements Reducer<IntWritable, 
IntWritable, IntWritable, IntWritable> {
-
-               @Override
-               public void reduce(IntWritable k, Iterator<IntWritable> v, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
-                               throws IOException {
-                       while(v.hasNext()) {
-                               out.collect(new IntWritable(k.get() % 4), 
v.next());
-                       }
-               }
-               
-               @Override
-               public void configure(JobConf arg0) { }
-
-               @Override
-               public void close() throws IOException { }
-       }
-       
-       public static class ConfigurableCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
-               private String countPrefix;
-               
-               @Override
-               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
-                               throws IOException {
-                       int commentCnt = 0;
-                       while(vs.hasNext()) {
-                               String v = vs.next().toString();
-                               if(v.startsWith(this.countPrefix)) {
-                                       commentCnt++;
-                               }
-                       }
-                       out.collect(k, new IntWritable(commentCnt));
-               }
-               
-               @Override
-               public void configure(final JobConf c) { 
-                       this.countPrefix = c.get("my.cntPrefix");
-               }
-
-               @Override
-               public void close() throws IOException { }
-       }
-
-       public static class Mapper1 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable,
-                       IntWritable>> {
-               private static final long serialVersionUID = 1L;
-               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
-               @Override
-               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
-               throws Exception {
-                       outT.f0 = new IntWritable(v.f0.get() / 6);
-                       outT.f1 = new IntWritable(1);
-                       return outT;
-               }
-       }
-
-       public static class Mapper2 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable,
-                       IntWritable>> {
-               private static final long serialVersionUID = 1L;
-               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
-               @Override
-               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
-               throws Exception {
-                       outT.f0 = new IntWritable(0);
-                       outT.f1 = v.f0;
-                       return outT;
-               }
-       }
-
-       public static class Mapper3 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, IntWritable>> {
-               private static final long serialVersionUID = 1L;
-               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
-               @Override
-               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
-               throws Exception {
-                       outT.f0 = v.f0;
-                       outT.f1 = new IntWritable(1);
-                       return outT;
-               }
-       }
-
-       public static class Mapper4 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
-               throws Exception {
-                       v.f0 = new IntWritable(v.f0.get() % 5);
-                       return v;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
deleted file mode 100644
index abc0e9c..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
-
-       public HadoopReduceFunctionITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Test
-       public void testStandardGrouping() throws Exception{
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
-                               map(new Mapper1());
-
-               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
-                               groupBy(0).
-                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
CommentCntReducer()));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               commentCnts.writeAsText(resultPath);
-               env.execute();
-
-               String expected = "(0,0)\n"+
-                               "(1,3)\n" +
-                               "(2,5)\n" +
-                               "(3,5)\n" +
-                               "(4,2)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testUngroupedHadoopReducer() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
-
-               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
-                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
AllCommentCntReducer()));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               commentCnts.writeAsText(resultPath);
-               env.execute();
-
-               String expected = "(42,15)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testConfigurationViaJobConf() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               JobConf conf = new JobConf();
-               conf.set("my.cntPrefix", "Hello");
-
-               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
-                               map(new Mapper2());
-
-               DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
-                               groupBy(0).
-                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
-                                               new ConfigurableCntReducer(), 
conf));
-
-               String resultPath = tempFolder.newFile().toURI().toString();
-
-               helloCnts.writeAsText(resultPath);
-               env.execute();
-
-               String expected = "(0,0)\n"+
-                               "(1,0)\n" +
-                               "(2,1)\n" +
-                               "(3,1)\n" +
-                               "(4,1)\n";
-
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-       
-       public static class CommentCntReducer implements Reducer<IntWritable, 
Text, IntWritable, IntWritable> {
-               
-               @Override
-               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
-                               throws IOException {
-                       int commentCnt = 0;
-                       while(vs.hasNext()) {
-                               String v = vs.next().toString();
-                               if(v.startsWith("Comment")) {
-                                       commentCnt++;
-                               }
-                       }
-                       out.collect(k, new IntWritable(commentCnt));
-               }
-               
-               @Override
-               public void configure(final JobConf arg0) { }
-
-               @Override
-               public void close() throws IOException { }
-       }
-       
-       public static class AllCommentCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
-               
-               @Override
-               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
-                               throws IOException {
-                       int commentCnt = 0;
-                       while(vs.hasNext()) {
-                               String v = vs.next().toString();
-                               if(v.startsWith("Comment")) {
-                                       commentCnt++;
-                               }
-                       }
-                       out.collect(new IntWritable(42), new 
IntWritable(commentCnt));
-               }
-               
-               @Override
-               public void configure(final JobConf arg0) { }
-
-               @Override
-               public void close() throws IOException { }
-       }
-       
-       public static class ConfigurableCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
-               private String countPrefix;
-               
-               @Override
-               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
-                               throws IOException {
-                       int commentCnt = 0;
-                       while(vs.hasNext()) {
-                               String v = vs.next().toString();
-                               if(v.startsWith(this.countPrefix)) {
-                                       commentCnt++;
-                               }
-                       }
-                       out.collect(k, new IntWritable(commentCnt));
-               }
-               
-               @Override
-               public void configure(final JobConf c) { 
-                       this.countPrefix = c.get("my.cntPrefix");
-               }
-
-               @Override
-               public void close() throws IOException { }
-       }
-
-       public static class Mapper1 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
-               throws Exception {
-                       v.f0 = new IntWritable(v.f0.get() / 5);
-                       return v;
-               }
-       }
-
-       public static class Mapper2 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
-               throws Exception {
-                       v.f0 = new IntWritable(v.f0.get() % 5);
-                       return v;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
deleted file mode 100644
index eed6f8f..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-
-public class HadoopTestData {
-
-       public static DataSet<Tuple2<IntWritable, Text>> 
getKVPairDataSet(ExecutionEnvironment env) {
-               
-               List<Tuple2<IntWritable, Text>> data = new 
ArrayList<Tuple2<IntWritable, Text>>();
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new 
Text("Hi")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new 
Text("Hello")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new 
Text("Hello world")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new 
Text("Hello world, how are you?")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new 
Text("I am fine.")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new 
Text("Luke Skywalker")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new 
Text("Comment#1")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new 
Text("Comment#2")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new 
Text("Comment#3")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new 
Text("Comment#4")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new 
Text("Comment#5")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new 
Text("Comment#6")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new 
Text("Comment#7")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new 
Text("Comment#8")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new 
Text("Comment#9")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new 
Text("Comment#10")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new 
Text("Comment#11")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new 
Text("Comment#12")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new 
Text("Comment#13")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new 
Text("Comment#14")));
-               data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new 
Text("Comment#15")));
-               
-               Collections.shuffle(data);
-               
-               return env.fromCollection(data);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
deleted file mode 100644
index ce0143a..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred.example;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
- * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
- */
-public class HadoopMapredCompatWordCount {
-       
-       public static void main(String[] args) throws Exception {
-               if (args.length < 2) {
-                       System.err.println("Usage: WordCount <input path> 
<result path>");
-                       return;
-               }
-               
-               final String inputPath = args[0];
-               final String outputPath = args[1];
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // Set up the Hadoop Input Format
-               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, new JobConf());
-               TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), 
new Path(inputPath));
-               
-               // Create a Flink job with it
-               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
-               
-               DataSet<Tuple2<Text, LongWritable>> words = 
-                               text.flatMap(new 
HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
-                                       .groupBy(0).reduceGroup(new 
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new 
Counter(), new Counter()));
-               
-               // Set up Hadoop Output Format
-               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
-                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), new JobConf());
-               
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
-               TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), 
new Path(outputPath));
-               
-               // Output & Execute
-               words.output(hadoopOutputFormat).setParallelism(1);
-               env.execute("Hadoop Compat WordCount");
-       }
-       
-       
-       public static final class Tokenizer implements Mapper<LongWritable, 
Text, Text, LongWritable> {
-
-               @Override
-               public void map(LongWritable k, Text v, OutputCollector<Text, 
LongWritable> out, Reporter rep) 
-                               throws IOException {
-                       // normalize and split the line
-                       String line = v.toString();
-                       String[] tokens = line.toLowerCase().split("\\W+");
-                       
-                       // emit the pairs
-                       for (String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Text(token), new 
LongWritable(1l));
-                               }
-                       }
-               }
-               
-               @Override
-               public void configure(JobConf arg0) { }
-               
-               @Override
-               public void close() throws IOException { }
-               
-       }
-       
-       public static final class Counter implements Reducer<Text, 
LongWritable, Text, LongWritable> {
-
-               @Override
-               public void reduce(Text k, Iterator<LongWritable> vs, 
OutputCollector<Text, LongWritable> out, Reporter rep)
-                               throws IOException {
-                       
-                       long cnt = 0;
-                       while(vs.hasNext()) {
-                               cnt += vs.next().get();
-                       }
-                       out.collect(k, new LongWritable(cnt));
-                       
-               }
-               
-               @Override
-               public void configure(JobConf arg0) { }
-               
-               @Override
-               public void close() throws IOException { }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
deleted file mode 100644
index 524318c..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
-
-import java.util.ArrayList;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.hadoop.io.IntWritable;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class HadoopTupleUnwrappingIteratorTest {
-
-       @Test
-       public void testValueIterator() {
-               
-               HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
-                               new HadoopTupleUnwrappingIterator<IntWritable, 
IntWritable>(new WritableSerializer
-                                               
<IntWritable>(IntWritable.class));
-               
-               // many values
-               
-               ArrayList<Tuple2<IntWritable, IntWritable>> tList = new 
ArrayList<Tuple2<IntWritable, IntWritable>>();
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(1)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(2)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(3)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(4)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(5)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(6)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(7)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(8)));
-               
-               int expectedKey = 1;
-               int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
-               
-               valIt.set(tList.iterator());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-               for(int expectedValue : expectedValues) {
-                       Assert.assertTrue(valIt.hasNext());
-                       Assert.assertTrue(valIt.hasNext());
-                       Assert.assertTrue(valIt.next().get() == expectedValue);
-                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
-               }
-               Assert.assertFalse(valIt.hasNext());
-               Assert.assertFalse(valIt.hasNext());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-               
-               // one value
-               
-               tList.clear();
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(2),new IntWritable(10)));
-               
-               expectedKey = 2;
-               expectedValues = new int[]{10};
-               
-               valIt.set(tList.iterator());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-               for(int expectedValue : expectedValues) {
-                       Assert.assertTrue(valIt.hasNext());
-                       Assert.assertTrue(valIt.hasNext());
-                       Assert.assertTrue(valIt.next().get() == expectedValue);
-                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
-               }
-               Assert.assertFalse(valIt.hasNext());
-               Assert.assertFalse(valIt.hasNext());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-               
-               // more values
-               
-               tList.clear();
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(10)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(4)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(7)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(9)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(21)));
-               
-               expectedKey = 3;
-               expectedValues = new int[]{10,4,7,9,21};
-               
-               valIt.set(tList.iterator());
-               Assert.assertTrue(valIt.hasNext());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-               for(int expectedValue : expectedValues) {
-                       Assert.assertTrue(valIt.hasNext());
-                       Assert.assertTrue(valIt.hasNext());
-                       Assert.assertTrue(valIt.next().get() == expectedValue);
-                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
-               }
-               Assert.assertFalse(valIt.hasNext());
-               Assert.assertFalse(valIt.hasNext());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-               
-               // no has next calls
-               
-               tList.clear();
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(5)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(8)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(42)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(-1)));
-               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(0)));
-               
-               expectedKey = 4;
-               expectedValues = new int[]{5,8,42,-1,0};
-               
-               valIt.set(tList.iterator());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-               for(int expectedValue : expectedValues) {
-                       Assert.assertTrue(valIt.next().get() == expectedValue);
-               }
-               try {
-                       valIt.next();
-                       Assert.fail();
-               } catch (NoSuchElementException nsee) {
-                       // expected
-               }
-               Assert.assertFalse(valIt.hasNext());
-               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
deleted file mode 100644
index 698e356..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapreduce;
-
-import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopInputOutputITCase extends JavaProgramTestBase {
-       
-       protected String textPath;
-       protected String resultPath;
-       
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               textPath = createTempFile("text.txt", WordCountData.TEXT);
-               resultPath = getTempDirPath("result");
-               this.setParallelism(4);
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[]{".", "_"});
-       }
-       
-       @Override
-       protected void testProgram() throws Exception {
-               WordCount.main(new String[] { textPath, resultPath });
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
deleted file mode 100644
index ed83d78..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapreduce.example;
-
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
- * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
- */
-@SuppressWarnings("serial")
-public class WordCount {
-       
-       public static void main(String[] args) throws Exception {
-               if (args.length < 2) {
-                       System.err.println("Usage: WordCount <input path> 
<result path>");
-                       return;
-               }
-               
-               final String inputPath = args[0];
-               final String outputPath = args[1];
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // Set up the Hadoop Input Format
-               Job job = Job.getInstance();
-               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, job);
-               TextInputFormat.addInputPath(job, new Path(inputPath));
-               
-               // Create a Flink job with it
-               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
-               
-               // Tokenize the line and convert from Writable "Text" to String 
for better handling
-               DataSet<Tuple2<String, Integer>> words = text.flatMap(new 
Tokenizer());
-               
-               // Sum up the words
-               DataSet<Tuple2<String, Integer>> result = 
words.groupBy(0).aggregate(Aggregations.SUM, 1);
-               
-               // Convert String back to Writable "Text" for use with Hadoop 
Output Format
-               DataSet<Tuple2<Text, IntWritable>> hadoopResult = 
result.map(new HadoopDatatypeMapper());
-               
-               // Set up Hadoop Output Format
-               HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new 
HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, 
IntWritable>(), job);
-               
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator",
 " ");
-               
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", 
" "); // set the value for both, since this test
-               TextOutputFormat.setOutputPath(job, new Path(outputPath));
-               
-               // Output & Execute
-               hadoopResult.output(hadoopOutputFormat);
-               env.execute("Word Count");
-       }
-       
-       /**
-        * Splits a line into words and converts Hadoop Writables into normal 
Java data types.
-        */
-       public static final class Tokenizer extends 
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-               
-               @Override
-               public void flatMap(Tuple2<LongWritable, Text> value, 
Collector<Tuple2<String, Integer>> out) {
-                       // normalize and split the line
-                       String line = value.f1.toString();
-                       String[] tokens = line.toLowerCase().split("\\W+");
-                       
-                       // emit the pairs
-                       for (String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
-                               }
-                       }
-               }
-       }
-       
-       /**
-        * Converts Java data types to Hadoop Writables.
-        */
-       public static final class HadoopDatatypeMapper extends 
RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-               
-               @Override
-               public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> 
value) throws Exception {
-                       return new Tuple2<Text, IntWritable>(new 
Text(value.f0), new IntWritable(value.f1));
-               }
-               
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/pom.xml 
b/flink-batch-connectors/flink-hbase/pom.xml
deleted file mode 100644
index 70a5692..0000000
--- a/flink-batch-connectors/flink-hbase/pom.xml
+++ /dev/null
@@ -1,264 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-batch-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-hbase_2.10</artifactId>
-       <name>flink-hbase</name>
-       <packaging>jar</packaging>
-
-       <properties>
-               <hbase.version>1.2.3</hbase.version>
-       </properties>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-surefire-plugin</artifactId>
-                               <version>2.19.1</version>
-                               <configuration>
-                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
-                                       <forkCount>1</forkCount>
-                               </configuration>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <!-- Disable inherited 
shade-flink because of a problem in the shade plugin -->
-                                               <!-- When enabled you'll run 
into an infinite loop creating the dependency-reduced-pom.xml -->
-                                               <!-- Seems similar to 
https://issues.apache.org/jira/browse/MSHADE-148 -->
-                                               <id>shade-flink</id>
-                                               <phase>none</phase>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-hadoop2</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-
-                       <!--Exclude Guava in order to run the HBaseMiniCluster 
during testing-->
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>com.google.guava</groupId>
-                                       <artifactId>guava</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- HBase server needed for TableOutputFormat -->
-               <!-- TODO implement bulk output format for HBase -->
-               <dependency>
-                       <groupId>org.apache.hbase</groupId>
-                       <artifactId>hbase-server</artifactId>
-                       <version>${hbase.version}</version>
-                       <exclusions>
-                               <!-- Remove unneeded dependency, which is 
conflicting with our jetty-util version. -->
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jetty-util</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jetty</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jetty-sslengine</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jsp-2.1</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jsp-api-2.1</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>servlet-api-2.5</artifactId>
-                               </exclusion>
-                               <!-- The hadoop dependencies are handled 
through flink-shaded-hadoop -->
-                               <exclusion>
-                                       <groupId>org.apache.hadoop</groupId>
-                                       <artifactId>hadoop-common</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.hadoop</groupId>
-                                       <artifactId>hadoop-auth</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.hadoop</groupId>
-                                       
<artifactId>hadoop-annotations</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.hadoop</groupId>
-                                       
<artifactId>hadoop-mapreduce-client-core</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.hadoop</groupId>
-                                       <artifactId>hadoop-client</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.hadoop</groupId>
-                                       <artifactId>hadoop-hdfs</artifactId>
-                               </exclusion>
-                               <!-- Bug in hbase annotations, can be removed 
when fixed. See FLINK-2153. -->
-                               <exclusion>
-                                       <groupId>org.apache.hbase</groupId>
-                                       
<artifactId>hbase-annotations</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-clients_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.apache.hadoop</groupId>
-                                       <artifactId>hadoop-core</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-hadoop-compatibility_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.apache.flink</groupId>
-                                       
<artifactId>flink-shaded-include-yarn_2.10</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- Test dependencies are only available for Hadoop-2. -->
-               <dependency>
-                       <groupId>org.apache.hbase</groupId>
-                       <artifactId>hbase-server</artifactId>
-                       <version>${hbase.version}</version>
-                       <classifier>tests</classifier>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-minicluster</artifactId>
-                       <version>${hadoop.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hbase</groupId>
-                       <artifactId>hbase-hadoop-compat</artifactId>
-                       <version>${hbase.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-hdfs</artifactId>
-                       <version>${hadoop.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hbase</groupId>
-                       <artifactId>hbase-hadoop2-compat</artifactId>
-                       <version>${hbase.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-       </dependencies>
-
-       <profiles>
-               <profile>
-                       <id>cdh5.1.3</id>
-                       <properties>
-                               <hbase.version>0.98.1-cdh5.1.3</hbase.version>
-                               <hadoop.version>2.3.0-cdh5.1.3</hadoop.version>
-                               <!-- Cloudera use different versions for hadoop 
core and commons-->
-                               <!-- This profile could be removed if Cloudera 
fix this mismatch! -->
-                               
<hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version>
-                       </properties>
-                       <dependencyManagement>
-                               <dependencies>
-                                       <dependency>
-                                               
<groupId>org.apache.hadoop</groupId>
-                                               
<artifactId>hadoop-core</artifactId>
-                                               
<version>${hadoop.core.version}</version>
-                                       </dependency>
-                               </dependencies>
-                       </dependencyManagement>
-               </profile>
-
-       </profiles>
-
-</project>

Reply via email to