http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
new file mode 100644
index 0000000..4d1acb4
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
+
+       public HadoopMapFunctionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testNonPassingMapper() throws Exception{
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+               DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
+                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new NonPassingMapper()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               nonPassingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
+               env.execute();
+
+               compareResultsByLinesInMemory("\n", resultPath);
+       }
+
+       @Test
+       public void testDataDuplicatingMapper() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+               DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
+                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new DuplicatingMapper()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               duplicatingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
+               env.execute();
+
+               String expected = "(1,Hi)\n" + "(1,HI)\n" +
+                               "(2,Hello)\n" + "(2,HELLO)\n" +
+                               "(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
+                               "(4,Hello world, how are you?)\n" + "(4,HELLO 
WORLD, HOW ARE YOU?)\n" +
+                               "(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
+                               "(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" 
+
+                               "(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
+                               "(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
+                               "(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
+                               "(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
+                               "(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
+                               "(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
+                               "(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
+                               "(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
+                               "(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
+                               "(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
+                               "(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
+                               "(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
+                               "(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
+                               "(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
+                               "(21,Comment#15)\n" + "(21,COMMENT#15)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testConfigurableMapper() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               JobConf conf = new JobConf();
+               conf.set("my.filterPrefix", "Hello");
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+               DataSet<Tuple2<IntWritable, Text>> hellos = ds.
+                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new ConfigurableMapper(), conf));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+               env.execute();
+
+               String expected = "(2,Hello)\n" +
+                               "(3,Hello world)\n" +
+                               "(4,Hello world, how are you?)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+       
+
+       
+       public static class NonPassingMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
+               
+               @Override
+               public void map(final IntWritable k, final Text v, 
+                               final OutputCollector<IntWritable, Text> out, 
final Reporter r) throws IOException {
+                       if ( v.toString().contains("bananas") ) {
+                               out.collect(k,v);
+                       }
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class DuplicatingMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
+               
+               @Override
+               public void map(final IntWritable k, final Text v, 
+                               final OutputCollector<IntWritable, Text> out, 
final Reporter r) throws IOException {
+                       out.collect(k, v);
+                       out.collect(k, new Text(v.toString().toUpperCase()));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class ConfigurableMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
+               private String filterPrefix;
+               
+               @Override
+               public void map(IntWritable k, Text v, 
OutputCollector<IntWritable, Text> out, Reporter r)
+                               throws IOException {
+                       if(v.toString().startsWith(filterPrefix)) {
+                               out.collect(k, v);
+                       }
+               }
+               
+               @Override
+               public void configure(JobConf c) {
+                       filterPrefix = c.get("my.filterPrefix");
+               }
+
+               @Override
+               public void close() throws IOException { }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
new file mode 100644
index 0000000..ccc0d82
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import 
org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopMapredITCase extends JavaProgramTestBase {
+       
+       protected String textPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+               this.setParallelism(4);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[]{".", "_"});
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               HadoopMapredCompatWordCount.main(new String[] { textPath, 
resultPath });
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
new file mode 100644
index 0000000..13d971c
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.hamcrest.core.IsEqual;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBase {
+
+       public HadoopReduceCombineFunctionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testStandardCountingWithCombiner() throws Exception{
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper1());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+                                               new SumReducer(), new 
SumReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               counts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,5)\n"+
+                               "(1,6)\n" +
+                               "(2,6)\n" +
+                               "(3,4)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testUngroupedHadoopReducer() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper2());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
+                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+                                               new SumReducer(), new 
SumReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               sum.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,231)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testCombiner() throws Exception {
+               org.junit.Assume.assumeThat(mode, new 
IsEqual<TestExecutionMode>(TestExecutionMode.CLUSTER));
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper3());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+                                               new SumReducer(), new 
KeyChangingReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               counts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,5)\n"+
+                               "(1,6)\n" +
+                               "(2,5)\n" +
+                               "(3,5)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testConfigurationViaJobConf() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               JobConf conf = new JobConf();
+               conf.set("my.cntPrefix", "Hello");
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper4());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+                                               new ConfigurableCntReducer(), 
conf));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               hellos.writeAsText(resultPath);
+               env.execute();
+
+               // return expected result
+               String expected = "(0,0)\n"+
+                               "(1,0)\n" +
+                               "(2,1)\n" +
+                               "(3,1)\n" +
+                               "(4,1)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+       
+       public static class SumReducer implements Reducer<IntWritable, 
IntWritable, IntWritable, IntWritable> {
+
+               @Override
+               public void reduce(IntWritable k, Iterator<IntWritable> v, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       
+                       int sum = 0;
+                       while(v.hasNext()) {
+                               sum += v.next().get();
+                       }
+                       out.collect(k, new IntWritable(sum));
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class KeyChangingReducer implements Reducer<IntWritable, 
IntWritable, IntWritable, IntWritable> {
+
+               @Override
+               public void reduce(IntWritable k, Iterator<IntWritable> v, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       while(v.hasNext()) {
+                               out.collect(new IntWritable(k.get() % 4), 
v.next());
+                       }
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class ConfigurableCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               private String countPrefix;
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith(this.countPrefix)) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf c) { 
+                       this.countPrefix = c.get("my.cntPrefix");
+               }
+
+               @Override
+               public void close() throws IOException { }
+       }
+
+       public static class Mapper1 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable,
+                       IntWritable>> {
+               private static final long serialVersionUID = 1L;
+               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
+               @Override
+               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
+               throws Exception {
+                       outT.f0 = new IntWritable(v.f0.get() / 6);
+                       outT.f1 = new IntWritable(1);
+                       return outT;
+               }
+       }
+
+       public static class Mapper2 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable,
+                       IntWritable>> {
+               private static final long serialVersionUID = 1L;
+               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
+               @Override
+               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
+               throws Exception {
+                       outT.f0 = new IntWritable(0);
+                       outT.f1 = v.f0;
+                       return outT;
+               }
+       }
+
+       public static class Mapper3 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, IntWritable>> {
+               private static final long serialVersionUID = 1L;
+               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
+               @Override
+               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
+               throws Exception {
+                       outT.f0 = v.f0;
+                       outT.f1 = new IntWritable(1);
+                       return outT;
+               }
+       }
+
+       public static class Mapper4 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() % 5);
+                       return v;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
new file mode 100644
index 0000000..abc0e9c
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
+
+       public HadoopReduceFunctionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testStandardGrouping() throws Exception{
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper1());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
CommentCntReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               commentCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,0)\n"+
+                               "(1,3)\n" +
+                               "(2,5)\n" +
+                               "(3,5)\n" +
+                               "(4,2)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testUngroupedHadoopReducer() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+
+               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
AllCommentCntReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               commentCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(42,15)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testConfigurationViaJobConf() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               JobConf conf = new JobConf();
+               conf.set("my.cntPrefix", "Hello");
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper2());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+                                               new ConfigurableCntReducer(), 
conf));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               helloCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,0)\n"+
+                               "(1,0)\n" +
+                               "(2,1)\n" +
+                               "(3,1)\n" +
+                               "(4,1)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+       
+       public static class CommentCntReducer implements Reducer<IntWritable, 
Text, IntWritable, IntWritable> {
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith("Comment")) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class AllCommentCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith("Comment")) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(new IntWritable(42), new 
IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class ConfigurableCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               private String countPrefix;
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith(this.countPrefix)) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf c) { 
+                       this.countPrefix = c.get("my.cntPrefix");
+               }
+
+               @Override
+               public void close() throws IOException { }
+       }
+
+       public static class Mapper1 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() / 5);
+                       return v;
+               }
+       }
+
+       public static class Mapper2 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() % 5);
+                       return v;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
new file mode 100644
index 0000000..eed6f8f
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+public class HadoopTestData {
+
+       public static DataSet<Tuple2<IntWritable, Text>> 
getKVPairDataSet(ExecutionEnvironment env) {
+               
+               List<Tuple2<IntWritable, Text>> data = new 
ArrayList<Tuple2<IntWritable, Text>>();
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new 
Text("Hi")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new 
Text("Hello")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new 
Text("Hello world")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new 
Text("Hello world, how are you?")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new 
Text("I am fine.")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new 
Text("Luke Skywalker")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new 
Text("Comment#1")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new 
Text("Comment#2")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new 
Text("Comment#3")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new 
Text("Comment#4")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new 
Text("Comment#5")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new 
Text("Comment#6")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new 
Text("Comment#7")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new 
Text("Comment#8")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new 
Text("Comment#9")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new 
Text("Comment#10")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new 
Text("Comment#11")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new 
Text("Comment#12")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new 
Text("Comment#13")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new 
Text("Comment#14")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new 
Text("Comment#15")));
+               
+               Collections.shuffle(data);
+               
+               return env.fromCollection(data);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
new file mode 100644
index 0000000..ce0143a
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
+ */
+public class HadoopMapredCompatWordCount {
+       
+       public static void main(String[] args) throws Exception {
+               if (args.length < 2) {
+                       System.err.println("Usage: WordCount <input path> 
<result path>");
+                       return;
+               }
+               
+               final String inputPath = args[0];
+               final String outputPath = args[1];
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // Set up the Hadoop Input Format
+               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, new JobConf());
+               TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), 
new Path(inputPath));
+               
+               // Create a Flink job with it
+               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
+               
+               DataSet<Tuple2<Text, LongWritable>> words = 
+                               text.flatMap(new 
HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
+                                       .groupBy(0).reduceGroup(new 
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new 
Counter(), new Counter()));
+               
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
+                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), new JobConf());
+               
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+               TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), 
new Path(outputPath));
+               
+               // Output & Execute
+               words.output(hadoopOutputFormat).setParallelism(1);
+               env.execute("Hadoop Compat WordCount");
+       }
+       
+       
+       public static final class Tokenizer implements Mapper<LongWritable, 
Text, Text, LongWritable> {
+
+               @Override
+               public void map(LongWritable k, Text v, OutputCollector<Text, 
LongWritable> out, Reporter rep) 
+                               throws IOException {
+                       // normalize and split the line
+                       String line = v.toString();
+                       String[] tokens = line.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Text(token), new 
LongWritable(1l));
+                               }
+                       }
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+               
+               @Override
+               public void close() throws IOException { }
+               
+       }
+       
+       public static final class Counter implements Reducer<Text, 
LongWritable, Text, LongWritable> {
+
+               @Override
+               public void reduce(Text k, Iterator<LongWritable> vs, 
OutputCollector<Text, LongWritable> out, Reporter rep)
+                               throws IOException {
+                       
+                       long cnt = 0;
+                       while(vs.hasNext()) {
+                               cnt += vs.next().get();
+                       }
+                       out.collect(k, new LongWritable(cnt));
+                       
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+               
+               @Override
+               public void close() throws IOException { }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
new file mode 100644
index 0000000..524318c
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
+
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HadoopTupleUnwrappingIteratorTest {
+
+       @Test
+       public void testValueIterator() {
+               
+               HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
+                               new HadoopTupleUnwrappingIterator<IntWritable, 
IntWritable>(new WritableSerializer
+                                               
<IntWritable>(IntWritable.class));
+               
+               // many values
+               
+               ArrayList<Tuple2<IntWritable, IntWritable>> tList = new 
ArrayList<Tuple2<IntWritable, IntWritable>>();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(1)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(2)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(3)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(4)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(5)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(6)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(7)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(8)));
+               
+               int expectedKey = 1;
+               int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // one value
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(2),new IntWritable(10)));
+               
+               expectedKey = 2;
+               expectedValues = new int[]{10};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // more values
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(10)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(4)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(7)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(9)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(21)));
+               
+               expectedKey = 3;
+               expectedValues = new int[]{10,4,7,9,21};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // no has next calls
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(5)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(8)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(42)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(-1)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(0)));
+               
+               expectedKey = 4;
+               expectedValues = new int[]{5,8,42,-1,0};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+               }
+               try {
+                       valIt.next();
+                       Assert.fail();
+               } catch (NoSuchElementException nsee) {
+                       // expected
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
new file mode 100644
index 0000000..698e356
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopInputOutputITCase extends JavaProgramTestBase {
+       
+       protected String textPath;
+       protected String resultPath;
+       
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+               this.setParallelism(4);
+       }
+       
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[]{".", "_"});
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               WordCount.main(new String[] { textPath, resultPath });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
new file mode 100644
index 0000000..ed83d78
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce.example;
+
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+       
+       public static void main(String[] args) throws Exception {
+               if (args.length < 2) {
+                       System.err.println("Usage: WordCount <input path> 
<result path>");
+                       return;
+               }
+               
+               final String inputPath = args[0];
+               final String outputPath = args[1];
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // Set up the Hadoop Input Format
+               Job job = Job.getInstance();
+               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, job);
+               TextInputFormat.addInputPath(job, new Path(inputPath));
+               
+               // Create a Flink job with it
+               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
+               
+               // Tokenize the line and convert from Writable "Text" to String 
for better handling
+               DataSet<Tuple2<String, Integer>> words = text.flatMap(new 
Tokenizer());
+               
+               // Sum up the words
+               DataSet<Tuple2<String, Integer>> result = 
words.groupBy(0).aggregate(Aggregations.SUM, 1);
+               
+               // Convert String back to Writable "Text" for use with Hadoop 
Output Format
+               DataSet<Tuple2<Text, IntWritable>> hadoopResult = 
result.map(new HadoopDatatypeMapper());
+               
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new 
HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, 
IntWritable>(), job);
+               
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator",
 " ");
+               
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", 
" "); // set the value for both, since this test
+               TextOutputFormat.setOutputPath(job, new Path(outputPath));
+               
+               // Output & Execute
+               hadoopResult.output(hadoopOutputFormat);
+               env.execute("Word Count");
+       }
+       
+       /**
+        * Splits a line into words and converts Hadoop Writables into normal 
Java data types.
+        */
+       public static final class Tokenizer extends 
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+               
+               @Override
+               public void flatMap(Tuple2<LongWritable, Text> value, 
Collector<Tuple2<String, Integer>> out) {
+                       // normalize and split the line
+                       String line = value.f1.toString();
+                       String[] tokens = line.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * Converts Java data types to Hadoop Writables.
+        */
+       public static final class HadoopDatatypeMapper extends 
RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+               
+               @Override
+               public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> 
value) throws Exception {
+                       return new Tuple2<Text, IntWritable>(new 
Text(value.f0), new IntWritable(value.f1));
+               }
+               
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
 
b/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/pom.xml 
b/flink-connectors/flink-hbase/pom.xml
new file mode 100644
index 0000000..9b1e174
--- /dev/null
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -0,0 +1,264 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-hbase_2.10</artifactId>
+       <name>flink-hbase</name>
+       <packaging>jar</packaging>
+
+       <properties>
+               <hbase.version>1.2.3</hbase.version>
+       </properties>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.19.1</version>
+                               <configuration>
+                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
+                                       <forkCount>1</forkCount>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <!-- Disable inherited 
shade-flink because of a problem in the shade plugin -->
+                                               <!-- When enabled you'll run 
into an infinite loop creating the dependency-reduced-pom.xml -->
+                                               <!-- Seems similar to 
https://issues.apache.org/jira/browse/MSHADE-148 -->
+                                               <id>shade-flink</id>
+                                               <phase>none</phase>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-hadoop2</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+
+                       <!--Exclude Guava in order to run the HBaseMiniCluster 
during testing-->
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>com.google.guava</groupId>
+                                       <artifactId>guava</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- HBase server needed for TableOutputFormat -->
+               <!-- TODO implement bulk output format for HBase -->
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-server</artifactId>
+                       <version>${hbase.version}</version>
+                       <exclusions>
+                               <!-- Remove unneeded dependency, which is 
conflicting with our jetty-util version. -->
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty-util</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty-sslengine</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jsp-2.1</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jsp-api-2.1</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>servlet-api-2.5</artifactId>
+                               </exclusion>
+                               <!-- The hadoop dependencies are handled 
through flink-shaded-hadoop -->
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-common</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-auth</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       
<artifactId>hadoop-annotations</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       
<artifactId>hadoop-mapreduce-client-core</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-client</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-hdfs</artifactId>
+                               </exclusion>
+                               <!-- Bug in hbase annotations, can be removed 
when fixed. See FLINK-2153. -->
+                               <exclusion>
+                                       <groupId>org.apache.hbase</groupId>
+                                       
<artifactId>hbase-annotations</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-core</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-hadoop-compatibility_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.flink</groupId>
+                                       
<artifactId>flink-shaded-include-yarn_2.10</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- Test dependencies are only available for Hadoop-2. -->
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-server</artifactId>
+                       <version>${hbase.version}</version>
+                       <classifier>tests</classifier>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minicluster</artifactId>
+                       <version>${hadoop.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop-compat</artifactId>
+                       <version>${hbase.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <version>${hadoop.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop2-compat</artifactId>
+                       <version>${hbase.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+       </dependencies>
+
+       <profiles>
+               <profile>
+                       <id>cdh5.1.3</id>
+                       <properties>
+                               <hbase.version>0.98.1-cdh5.1.3</hbase.version>
+                               <hadoop.version>2.3.0-cdh5.1.3</hadoop.version>
+                               <!-- Cloudera use different versions for hadoop 
core and commons-->
+                               <!-- This profile could be removed if Cloudera 
fix this mismatch! -->
+                               
<hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version>
+                       </properties>
+                       <dependencyManagement>
+                               <dependencies>
+                                       <dependency>
+                                               
<groupId>org.apache.hadoop</groupId>
+                                               
<artifactId>hadoop-core</artifactId>
+                                               
<version>${hadoop.core.version}</version>
+                                       </dependency>
+                               </dependencies>
+                       </dependencyManagement>
+               </profile>
+
+       </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
new file mode 100644
index 0000000..35b0a7c
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables.
+ */
+public abstract class TableInputFormat<T extends Tuple> extends 
RichInputFormat<T, TableInputSplit> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TableInputFormat.class);
+
+       /** helper variable to decide whether the input is exhausted or not */
+       private boolean endReached = false;
+
+       protected transient HTable table = null;
+       protected transient Scan scan = null;
+
+       /** HBase iterator wrapper */
+       private ResultScanner resultScanner = null;
+
+       private byte[] lastRow;
+       private int scannedRows;
+
+       /**
+        * Returns an instance of Scan that retrieves the required subset of 
records from the HBase table.
+        * @return The appropriate instance of Scan for this usecase.
+        */
+       protected abstract Scan getScanner();
+
+       /**
+        * What table is to be read.
+        * Per instance of a TableInputFormat derivative only a single 
tablename is possible.
+        * @return The name of the table
+        */
+       protected abstract String getTableName();
+
+       /**
+        * The output from HBase is always an instance of {@link Result}.
+        * This method is to copy the data in the Result instance into the 
required {@link Tuple}
+        * @param r The Result instance from HBase that needs to be converted
+        * @return The approriate instance of {@link Tuple} that contains the 
needed information.
+        */
+       protected abstract T mapResultToTuple(Result r);
+
+       /**
+        * Creates a {@link Scan} object and opens the {@link HTable} 
connection.
+        * These are opened here because they are needed in the 
createInputSplits
+        * which is called before the openInputFormat method.
+        * So the connection is opened in {@link #configure(Configuration)} and 
closed in {@link #closeInputFormat()}.
+        *
+        * @param parameters The configuration that is to be used
+        * @see Configuration
+        */
+       @Override
+       public void configure(Configuration parameters) {
+               table = createTable();
+               if (table != null) {
+                       scan = getScanner();
+               }
+       }
+
+       /**
+        * Create an {@link HTable} instance and set it into this format
+        */
+       private HTable createTable() {
+               LOG.info("Initializing HBaseConfiguration");
+               //use files found in the classpath
+               org.apache.hadoop.conf.Configuration hConf = 
HBaseConfiguration.create();
+
+               try {
+                       return new HTable(hConf, getTableName());
+               } catch (Exception e) {
+                       LOG.error("Error instantiating a new HTable instance", 
e);
+               }
+               return null;
+       }
+
+       @Override
+       public void open(TableInputSplit split) throws IOException {
+               if (table == null) {
+                       throw new IOException("The HBase table has not been 
opened!");
+               }
+               if (scan == null) {
+                       throw new IOException("getScanner returned null");
+               }
+               if (split == null) {
+                       throw new IOException("Input split is null!");
+               }
+
+               logSplitInfo("opening", split);
+               scan.setStartRow(split.getStartRow());
+               lastRow = split.getEndRow();
+               scan.setStopRow(lastRow);
+
+               resultScanner = table.getScanner(scan);
+               endReached = false;
+               scannedRows = 0;
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return endReached;
+       }
+
+       @Override
+       public T nextRecord(T reuse) throws IOException {
+               if (resultScanner == null) {
+                       throw new IOException("No table result scanner 
provided!");
+               }
+               try {
+                       Result res = resultScanner.next();
+                       if (res != null) {
+                               scannedRows++;
+                               lastRow = res.getRow();
+                               return mapResultToTuple(res);
+                       }
+               } catch (Exception e) {
+                       resultScanner.close();
+                       //workaround for timeout on scan
+                       LOG.warn("Error after scan of " + scannedRows + " rows. 
Retry with a new scanner...", e);
+                       scan.setStartRow(lastRow);
+                       resultScanner = table.getScanner(scan);
+                       Result res = resultScanner.next();
+                       if (res != null) {
+                               scannedRows++;
+                               lastRow = res.getRow();
+                               return mapResultToTuple(res);
+                       }
+               }
+
+               endReached = true;
+               return null;
+       }
+
+       @Override
+       public void close() throws IOException {
+               LOG.info("Closing split (scanned {} rows)", scannedRows);
+               lastRow = null;
+               try {
+                       if (resultScanner != null) {
+                               resultScanner.close();
+                       }
+               } finally {
+                       resultScanner = null;
+               }
+       }
+
+       @Override
+       public void closeInputFormat() throws IOException {
+               try {
+                       if (table != null) {
+                               table.close();
+                       }
+               } finally {
+                       table = null;
+               }
+       }
+
+       @Override
+       public TableInputSplit[] createInputSplits(final int minNumSplits) 
throws IOException {
+               if (table == null) {
+                       throw new IOException("The HBase table has not been 
opened!");
+               }
+               if (scan == null) {
+                       throw new IOException("getScanner returned null");
+               }
+
+               //Gets the starting and ending row keys for every region in the 
currently open table
+               final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+               if (keys == null || keys.getFirst() == null || 
keys.getFirst().length == 0) {
+                       throw new IOException("Expecting at least one region.");
+               }
+               final byte[] startRow = scan.getStartRow();
+               final byte[] stopRow = scan.getStopRow();
+               final boolean scanWithNoLowerBound = startRow.length == 0;
+               final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+               final List<TableInputSplit> splits = new 
ArrayList<TableInputSplit>(minNumSplits);
+               for (int i = 0; i < keys.getFirst().length; i++) {
+                       final byte[] startKey = keys.getFirst()[i];
+                       final byte[] endKey = keys.getSecond()[i];
+                       final String regionLocation = 
table.getRegionLocation(startKey, false).getHostnamePort();
+                       //Test if the given region is to be included in the 
InputSplit while splitting the regions of a table
+                       if (!includeRegionInSplit(startKey, endKey)) {
+                               continue;
+                       }
+                       //Finds the region on which the given row is being 
served
+                       final String[] hosts = new String[]{regionLocation};
+
+                       // determine if regions contains keys used by the scan
+                       boolean isLastRegion = endKey.length == 0;
+                       if ((scanWithNoLowerBound || isLastRegion || 
Bytes.compareTo(startRow, endKey) < 0) &&
+                               (scanWithNoUpperBound || 
Bytes.compareTo(stopRow, startKey) > 0)) {
+
+                               final byte[] splitStart = scanWithNoLowerBound 
|| Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
+                               final byte[] splitStop = (scanWithNoUpperBound 
|| Bytes.compareTo(endKey, stopRow) <= 0)
+                                       && !isLastRegion ? endKey : stopRow;
+                               int id = splits.size();
+                               final TableInputSplit split = new 
TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
+                               splits.add(split);
+                       }
+               }
+               LOG.info("Created " + splits.size() + " splits");
+               for (TableInputSplit split : splits) {
+                       logSplitInfo("created", split);
+               }
+               return splits.toArray(new TableInputSplit[0]);
+       }
+
+       private void logSplitInfo(String action, TableInputSplit split) {
+               int splitId = split.getSplitNumber();
+               String splitStart = Bytes.toString(split.getStartRow());
+               String splitEnd = Bytes.toString(split.getEndRow());
+               String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
+               String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
+               String[] hostnames = split.getHostnames();
+               LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, 
splitId, hostnames, splitStartKey, splitStopKey);
+       }
+
+       /**
+        * Test if the given region is to be included in the InputSplit while 
splitting the regions of a table.
+        * <p>
+        * This optimization is effective when there is a specific reasoning to 
exclude an entire region from the M-R job,
+        * (and hence, not contributing to the InputSplit), given the start and 
end keys of the same. <br>
+        * Useful when we need to remember the last-processed top record and 
revisit the [last, current) interval for M-R
+        * processing, continuously. In addition to reducing InputSplits, 
reduces the load on the region server as well, due
+        * to the ordering of the keys. <br>
+        * <br>
+        * Note: It is possible that <code>endKey.length() == 0 </code> , for 
the last (recent) region. <br>
+        * Override this method, if you want to bulk exclude regions altogether 
from M-R. By default, no region is excluded(
+        * i.e. all regions are included).
+        *
+        * @param startKey Start key of the region
+        * @param endKey   End key of the region
+        * @return true, if this region needs to be included as part of the 
input (default).
+        */
+       protected boolean includeRegionInSplit(final byte[] startKey, final 
byte[] endKey) {
+               return true;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] 
inputSplits) {
+               return new LocatableInputSplitAssigner(inputSplits);
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
new file mode 100644
index 0000000..75f0b9b
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+
+/**
+ * This class implements a input splits for HBase. Each table input split 
corresponds to a key range (low, high). All
+ * references to row below refer to the key of the row.
+ */
+public class TableInputSplit extends LocatableInputSplit {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The name of the table to retrieve data from */
+       private final byte[] tableName;
+
+       /** The start row of the split. */
+       private final byte[] startRow;
+
+       /** The end row of the split. */
+       private final byte[] endRow;
+
+       /**
+        * Creates a new table input split
+        * 
+        * @param splitNumber
+        *        the number of the input split
+        * @param hostnames
+        *        the names of the hosts storing the data the input split 
refers to
+        * @param tableName
+        *        the name of the table to retrieve data from
+        * @param startRow
+        *        the start row of the split
+        * @param endRow
+        *        the end row of the split
+        */
+       TableInputSplit(final int splitNumber, final String[] hostnames, final 
byte[] tableName, final byte[] startRow,
+                       final byte[] endRow) {
+               super(splitNumber, hostnames);
+
+               this.tableName = tableName;
+               this.startRow = startRow;
+               this.endRow = endRow;
+       }
+
+       /**
+        * Returns the table name.
+        * 
+        * @return The table name.
+        */
+       public byte[] getTableName() {
+               return this.tableName;
+       }
+
+       /**
+        * Returns the start row.
+        * 
+        * @return The start row.
+        */
+       public byte[] getStartRow() {
+               return this.startRow;
+       }
+
+       /**
+        * Returns the end row.
+        * 
+        * @return The end row.
+        */
+       public byte[] getEndRow() {
+               return this.endRow;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
new file mode 100644
index 0000000..3d9f672
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * By using this class as the super class of a set of tests you will have a 
HBase testing
+ * cluster available that is very suitable for writing tests for scanning and 
filtering against.
+ * This is usable by any downstream application because the HBase cluster is 
'injected' because
+ * a dynamically generated hbase-site.xml is added to the classpath.
+ * Because of this classpath manipulation it is not possible to start a second 
testing cluster in the same JVM.
+ * So if you have this you should either put all hbase related tests in a 
single class or force surefire to
+ * setup a new JVM for each testclass.
+ * See: 
http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html
+ */
+//
+// NOTE: The code in this file is based on code from the
+// Apache HBase project, licensed under the Apache License v 2.0
+//
+// 
https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
+//
+public class HBaseTestingClusterAutostarter implements Serializable {
+
+       private static final Log LOG = 
LogFactory.getLog(HBaseTestingClusterAutostarter.class);
+
+       private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+       private static HBaseAdmin admin = null;
+       private static List<TableName> createdTables = new ArrayList<>();
+
+       private static boolean alreadyRegisteredTestCluster = false;
+
+       protected static void createTable(TableName tableName, byte[] 
columnFamilyName, byte[][] splitKeys) {
+               LOG.info("HBase minicluster: Creating table " + 
tableName.getNameAsString());
+
+               assertNotNull("HBaseAdmin is not initialized successfully.", 
admin);
+               HTableDescriptor desc = new HTableDescriptor(tableName);
+               HColumnDescriptor colDef = new 
HColumnDescriptor(columnFamilyName);
+               desc.addFamily(colDef);
+
+               try {
+                       admin.createTable(desc, splitKeys);
+                       createdTables.add(tableName);
+                       assertTrue("Fail to create the table", 
admin.tableExists(tableName));
+               } catch (IOException e) {
+                       assertNull("Exception found while creating table", e);
+               }
+       }
+
+       protected static HTable openTable(TableName tableName) throws 
IOException {
+               HTable table = (HTable) 
admin.getConnection().getTable(tableName);
+               assertTrue("Fail to create the table", 
admin.tableExists(tableName));
+               return table;
+       }
+
+       private static void deleteTables() {
+               if (admin != null) {
+                       for (TableName tableName : createdTables) {
+                               try {
+                                       if (admin.tableExists(tableName)) {
+                                               admin.disableTable(tableName);
+                                               admin.deleteTable(tableName);
+                                       }
+                               } catch (IOException e) {
+                                       assertNull("Exception found deleting 
the table", e);
+                               }
+                       }
+               }
+       }
+
+       private static void initialize(Configuration conf) {
+               conf = HBaseConfiguration.create(conf);
+               conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+               try {
+                       admin = TEST_UTIL.getHBaseAdmin();
+               } catch (MasterNotRunningException e) {
+                       assertNull("Master is not running", e);
+               } catch (ZooKeeperConnectionException e) {
+                       assertNull("Cannot connect to ZooKeeper", e);
+               } catch (IOException e) {
+                       assertNull("IOException", e);
+               }
+       }
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               LOG.info("HBase minicluster: Starting");
+               ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL);
+               ((Log4JLogger) 
AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
+               ((Log4JLogger) 
ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
+
+               TEST_UTIL.startMiniCluster(1);
+
+               // https://issues.apache.org/jira/browse/HBASE-11711
+               TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 
-1);
+
+               // Make sure the zookeeper quorum value contains the right port 
number (varies per run).
+               TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", 
"localhost:" + TEST_UTIL.getZkCluster().getClientPort());
+
+               initialize(TEST_UTIL.getConfiguration());
+               LOG.info("HBase minicluster: Running");
+       }
+
+       private static File hbaseSiteXmlDirectory;
+       private static File hbaseSiteXmlFile;
+
+       /**
+        * This dynamically generates a hbase-site.xml file that is added to 
the classpath.
+        * This way this HBaseMinicluster can be used by an unmodified 
application.
+        * The downside is that this cannot be 'unloaded' so you can have only 
one per JVM.
+        */
+       public static void registerHBaseMiniClusterInClasspath() {
+               if (alreadyRegisteredTestCluster) {
+                       fail("You CANNOT register a second HBase Testing 
cluster in the classpath of the SAME JVM");
+               }
+               File baseDir = new File(System.getProperty("java.io.tmpdir", 
"/tmp/"));
+               hbaseSiteXmlDirectory = new File(baseDir, 
"unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
+
+               if (!hbaseSiteXmlDirectory.mkdirs()) {
+                       fail("Unable to create output directory " + 
hbaseSiteXmlDirectory + " for the HBase minicluster");
+               }
+
+               assertNotNull("The ZooKeeper for the HBase minicluster is 
missing", TEST_UTIL.getZkCluster());
+
+               createHBaseSiteXml(hbaseSiteXmlDirectory, 
TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
+               addDirectoryToClassPath(hbaseSiteXmlDirectory);
+
+               // Avoid starting it again.
+               alreadyRegisteredTestCluster = true;
+       }
+
+       private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, 
String zookeeperQuorum) {
+               hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, 
"hbase-site.xml");
+               // Create the hbase-site.xml file for this run.
+               try {
+                       String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
+                               "<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>\n" +
+                               "<configuration>\n" +
+                               "  <property>\n" +
+                               "    <name>hbase.zookeeper.quorum</name>\n" +
+                               "    <value>" + zookeeperQuorum + "</value>\n" +
+                               "  </property>\n" +
+                               "</configuration>";
+                       OutputStream fos = new 
FileOutputStream(hbaseSiteXmlFile);
+                       
fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
+                       fos.close();
+               } catch (IOException e) {
+                       fail("Unable to create " + hbaseSiteXmlFile);
+               }
+       }
+
+       private static void addDirectoryToClassPath(File directory) {
+               try {
+                       // Get the classloader actually used by 
HBaseConfiguration
+                       ClassLoader classLoader = 
HBaseConfiguration.create().getClassLoader();
+                       if (!(classLoader instanceof URLClassLoader)) {
+                               fail("We should get a URLClassLoader");
+                       }
+
+                       // Make the addURL method accessible
+                       Method method = 
URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+                       method.setAccessible(true);
+
+                       // Add the directory where we put the hbase-site.xml to 
the classpath
+                       method.invoke(classLoader, directory.toURI().toURL());
+               } catch (MalformedURLException | NoSuchMethodException | 
IllegalAccessException | InvocationTargetException e) {
+                       fail("Unable to add " + directory + " to classpath 
because of this exception: " + e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               LOG.info("HBase minicluster: Shutting down");
+               deleteTables();
+               hbaseSiteXmlFile.delete();
+               hbaseSiteXmlDirectory.delete();
+               TEST_UTIL.shutdownMiniCluster();
+               LOG.info("HBase minicluster: Down");
+       }
+
+}

Reply via email to