http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
new file mode 100644
index 0000000..97b9768
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a 
combinable Flink GroupReduceFunction.
+ */
+@SuppressWarnings("rawtypes")
+@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> 
+                                       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+       private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+       private transient JobConf jobConf;
+       
+       private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> 
valueIterator;
+       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
reduceCollector;
+       private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+       private transient Reporter reporter;
+
+       /**
+        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
+        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
+        */
+       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
+                                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+               this(hadoopReducer, hadoopCombiner, new JobConf());
+       }
+       
+       /**
+        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
+        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
+        * @param conf The JobConf that is used to configure both Hadoop 
Reducers.
+        */
+       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
+                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
+               if(hadoopReducer == null) {
+                       throw new NullPointerException("Reducer may not be 
null.");
+               }
+               if(hadoopCombiner == null) {
+                       throw new NullPointerException("Combiner may not be 
null.");
+               }
+               if(conf == null) {
+                       throw new NullPointerException("JobConf may not be 
null.");
+               }
+               
+               this.reducer = hadoopReducer;
+               this.combiner = hadoopCombiner;
+               this.jobConf = conf;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.reducer.configure(jobConf);
+               this.combiner.configure(jobConf);
+               
+               this.reporter = new HadoopDummyReporter();
+               Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass((Class<KEYIN>) 
inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
+               this.combineCollector = new HadoopOutputCollector<KEYIN, 
VALUEIN>();
+               this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
+       }
+
+       @Override
+       public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+                       throws Exception {
+               reduceCollector.setFlinkCollector(out);
+               valueIterator.set(values.iterator());
+               reducer.reduce(valueIterator.getCurrentKey(), valueIterator, 
reduceCollector, reporter);
+       }
+
+       @Override
+       public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+               combineCollector.setFlinkCollector(out);
+               valueIterator.set(values.iterator());
+               combiner.reduce(valueIterator.getCurrentKey(), valueIterator, 
combineCollector, reporter);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
+
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
+       }
+
+       /**
+        * Custom serialization methods.
+        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+        */
+       private void writeObject(final ObjectOutputStream out) throws 
IOException {
+               
+               out.writeObject(reducer.getClass());
+               out.writeObject(combiner.getClass());
+               jobConf.write(out);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               
+               Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+                               
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+               reducer = InstantiationUtil.instantiate(reducerClass);
+               
+               Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
+                               
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+               combiner = InstantiationUtil.instantiate(combinerClass);
+               
+               jobConf = new JobConf();
+               jobConf.readFields(in);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
new file mode 100644
index 0000000..1c47696
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction. 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+                                       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+       private transient JobConf jobConf;
+       
+       private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> 
valueIterator;
+       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
reduceCollector;
+       private transient Reporter reporter;
+       
+       /**
+        * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer to wrap.
+        */
+       public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopReducer) {
+               this(hadoopReducer, new JobConf());
+       }
+       
+       /**
+        * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer to wrap.
+        * @param conf The JobConf that is used to configure the Hadoop Reducer.
+        */
+       public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopReducer, JobConf conf) {
+               if(hadoopReducer == null) {
+                       throw new NullPointerException("Reducer may not be 
null.");
+               }
+               if(conf == null) {
+                       throw new NullPointerException("JobConf may not be 
null.");
+               }
+               
+               this.reducer = hadoopReducer;
+               this.jobConf = conf;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.reducer.configure(jobConf);
+               
+               this.reporter = new HadoopDummyReporter();
+               this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
+               Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
+       }
+
+       @Override
+       public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+                       throws Exception {
+               
+               reduceCollector.setFlinkCollector(out);
+               valueIterator.set(values.iterator());
+               reducer.reduce(valueIterator.getCurrentKey(), valueIterator, 
reduceCollector, reporter);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
+
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
+       }
+
+       /**
+        * Custom serialization methods
+        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+        */
+       private void writeObject(final ObjectOutputStream out) throws 
IOException {
+               
+               out.writeObject(reducer.getClass());
+               jobConf.write(out);             
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               
+               Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+                               
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+               reducer = InstantiationUtil.instantiate(reducerClass);
+               
+               jobConf = new JobConf();
+               jobConf.readFields(in);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
new file mode 100644
index 0000000..3547e47
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
+ */
+public class HadoopMapredCompatWordCount {
+       
+       public static void main(String[] args) throws Exception {
+               if (args.length < 2) {
+                       System.err.println("Usage: WordCount <input path> 
<result path>");
+                       return;
+               }
+               
+               final String inputPath = args[0];
+               final String outputPath = args[1];
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // Set up the Hadoop Input Format
+               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, new JobConf());
+               TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), 
new Path(inputPath));
+               
+               // Create a Flink job with it
+               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
+               
+               DataSet<Tuple2<Text, LongWritable>> words = 
+                               text.flatMap(new 
HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
+                                       .groupBy(0).reduceGroup(new 
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new 
Counter(), new Counter()));
+               
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
+                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), new JobConf());
+               
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+               TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), 
new Path(outputPath));
+               
+               // Output & Execute
+               words.output(hadoopOutputFormat).setParallelism(1);
+               env.execute("Hadoop Compat WordCount");
+       }
+       
+       
+       public static final class Tokenizer implements Mapper<LongWritable, 
Text, Text, LongWritable> {
+
+               @Override
+               public void map(LongWritable k, Text v, OutputCollector<Text, 
LongWritable> out, Reporter rep) 
+                               throws IOException {
+                       // normalize and split the line
+                       String line = v.toString();
+                       String[] tokens = line.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Text(token), new 
LongWritable(1l));
+                               }
+                       }
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+               
+               @Override
+               public void close() throws IOException { }
+               
+       }
+       
+       public static final class Counter implements Reducer<Text, 
LongWritable, Text, LongWritable> {
+
+               @Override
+               public void reduce(Text k, Iterator<LongWritable> vs, 
OutputCollector<Text, LongWritable> out, Reporter rep)
+                               throws IOException {
+                       
+                       long cnt = 0;
+                       while(vs.hasNext()) {
+                               cnt += vs.next().get();
+                       }
+                       out.collect(k, new LongWritable(cnt));
+                       
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+               
+               @Override
+               public void close() throws IOException { }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
new file mode 100644
index 0000000..fcb6841
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+
+/**
+ * A Hadoop OutputCollector that wraps a Flink OutputCollector.
+ * On each call of collect() the data is forwarded to the wrapped Flink 
collector.
+ * 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopOutputCollector<KEY,VALUE>
+               implements OutputCollector<KEY,VALUE> {
+
+       private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+       
+       private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+
+       /**
+        * Set the wrapped Flink collector.
+        * 
+        * @param flinkCollector The wrapped Flink OutputCollector.
+        */
+       public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> 
flinkCollector) {
+               this.flinkCollector = flinkCollector;
+       }
+       
+       /**
+        * Use the wrapped Flink collector to collect a key-value pair for 
Flink. 
+        * 
+        * @param key the key to collect
+        * @param val the value to collect
+        * @throws IOException unexpected of key or value in key-value pair.
+        */
+       @Override
+       public void collect(final KEY key, final VALUE val) throws IOException {
+
+               this.outTuple.f0 = key;
+               this.outTuple.f1 = val;
+               this.flinkCollector.collect(outTuple);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
new file mode 100644
index 0000000..a063183
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the 
second (value) field.
+ */
+@SuppressWarnings("rawtypes")
+public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
+                                                                       extends 
TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+       
+       private Iterator<Tuple2<KEY,VALUE>> iterator;
+       
+       private final TypeSerializer<KEY> keySerializer;
+       
+       private boolean atFirst = false;
+       private KEY curKey = null;
+       private VALUE firstValue = null;
+       
+       public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) 
{
+               this.keySerializer = keySerializer;
+       }
+       
+       /**
+       * Set the Flink iterator to wrap.
+       * 
+       * @param iterator The Flink iterator to wrap.
+       */
+       @Override()
+       public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+               this.iterator = iterator;
+               if(this.hasNext()) {
+                       final Tuple2<KEY, VALUE> tuple = iterator.next();
+                       this.curKey = keySerializer.copy(tuple.f0);
+                       this.firstValue = tuple.f1;
+                       this.atFirst = true;
+               } else {
+                       this.atFirst = false;
+               }
+       }
+       
+       @Override
+       public boolean hasNext() {
+               if(this.atFirst) {
+                       return true;
+               }
+               return iterator.hasNext();
+       }
+       
+       @Override
+       public VALUE next() {
+               if(this.atFirst) {
+                       this.atFirst = false;
+                       return firstValue;
+               }
+               
+               final Tuple2<KEY, VALUE> tuple = iterator.next();
+               return tuple.f1;
+       }
+       
+       public KEY getCurrentKey() {
+               return this.curKey;
+       }
+       
+       @Override
+       public void remove() {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
new file mode 100644
index 0000000..f5758eb
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapreduce.example;
+
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+       
+       public static void main(String[] args) throws Exception {
+               if (args.length < 2) {
+                       System.err.println("Usage: WordCount <input path> 
<result path>");
+                       return;
+               }
+               
+               final String inputPath = args[0];
+               final String outputPath = args[1];
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // Set up the Hadoop Input Format
+               Job job = Job.getInstance();
+               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, job);
+               TextInputFormat.addInputPath(job, new Path(inputPath));
+               
+               // Create a Flink job with it
+               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
+               
+               // Tokenize the line and convert from Writable "Text" to String 
for better handling
+               DataSet<Tuple2<String, Integer>> words = text.flatMap(new 
Tokenizer());
+               
+               // Sum up the words
+               DataSet<Tuple2<String, Integer>> result = 
words.groupBy(0).aggregate(Aggregations.SUM, 1);
+               
+               // Convert String back to Writable "Text" for use with Hadoop 
Output Format
+               DataSet<Tuple2<Text, IntWritable>> hadoopResult = 
result.map(new HadoopDatatypeMapper());
+               
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new 
HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, 
IntWritable>(), job);
+               
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator",
 " ");
+               
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", 
" "); // set the value for both, since this test
+               // is being executed with both types (hadoop1 and hadoop2 
profile)
+               TextOutputFormat.setOutputPath(job, new Path(outputPath));
+               
+               // Output & Execute
+               hadoopResult.output(hadoopOutputFormat);
+               env.execute("Word Count");
+       }
+       
+       /**
+        * Splits a line into words and converts Hadoop Writables into normal 
Java data types.
+        */
+       public static final class Tokenizer extends 
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+               
+               @Override
+               public void flatMap(Tuple2<LongWritable, Text> value, 
Collector<Tuple2<String, Integer>> out) {
+                       // normalize and split the line
+                       String line = value.f1.toString();
+                       String[] tokens = line.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * Converts Java data types to Hadoop Writables.
+        */
+       public static final class HadoopDatatypeMapper extends 
RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+               
+               @Override
+               public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> 
value) throws Exception {
+                       return new Tuple2<Text, IntWritable>(new 
Text(value.f0), new IntWritable(value.f1));
+               }
+               
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
new file mode 100644
index 0000000..4d1acb4
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
+
+       public HadoopMapFunctionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testNonPassingMapper() throws Exception{
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+               DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
+                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new NonPassingMapper()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               nonPassingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
+               env.execute();
+
+               compareResultsByLinesInMemory("\n", resultPath);
+       }
+
+       @Test
+       public void testDataDuplicatingMapper() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+               DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
+                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new DuplicatingMapper()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               duplicatingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
+               env.execute();
+
+               String expected = "(1,Hi)\n" + "(1,HI)\n" +
+                               "(2,Hello)\n" + "(2,HELLO)\n" +
+                               "(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
+                               "(4,Hello world, how are you?)\n" + "(4,HELLO 
WORLD, HOW ARE YOU?)\n" +
+                               "(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
+                               "(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" 
+
+                               "(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
+                               "(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
+                               "(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
+                               "(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
+                               "(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
+                               "(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
+                               "(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
+                               "(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
+                               "(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
+                               "(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
+                               "(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
+                               "(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
+                               "(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
+                               "(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
+                               "(21,Comment#15)\n" + "(21,COMMENT#15)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testConfigurableMapper() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               JobConf conf = new JobConf();
+               conf.set("my.filterPrefix", "Hello");
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+               DataSet<Tuple2<IntWritable, Text>> hellos = ds.
+                               flatMap(new HadoopMapFunction<IntWritable, 
Text, IntWritable, Text>(new ConfigurableMapper(), conf));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+               env.execute();
+
+               String expected = "(2,Hello)\n" +
+                               "(3,Hello world)\n" +
+                               "(4,Hello world, how are you?)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+       
+
+       
+       public static class NonPassingMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
+               
+               @Override
+               public void map(final IntWritable k, final Text v, 
+                               final OutputCollector<IntWritable, Text> out, 
final Reporter r) throws IOException {
+                       if ( v.toString().contains("bananas") ) {
+                               out.collect(k,v);
+                       }
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class DuplicatingMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
+               
+               @Override
+               public void map(final IntWritable k, final Text v, 
+                               final OutputCollector<IntWritable, Text> out, 
final Reporter r) throws IOException {
+                       out.collect(k, v);
+                       out.collect(k, new Text(v.toString().toUpperCase()));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class ConfigurableMapper implements Mapper<IntWritable, 
Text, IntWritable, Text> {
+               private String filterPrefix;
+               
+               @Override
+               public void map(IntWritable k, Text v, 
OutputCollector<IntWritable, Text> out, Reporter r)
+                               throws IOException {
+                       if(v.toString().startsWith(filterPrefix)) {
+                               out.collect(k, v);
+                       }
+               }
+               
+               @Override
+               public void configure(JobConf c) {
+                       filterPrefix = c.get("my.filterPrefix");
+               }
+
+               @Override
+               public void close() throws IOException { }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
new file mode 100644
index 0000000..bbb7503
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import 
org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopMapredITCase extends JavaProgramTestBase {
+       
+       protected String textPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+               this.setParallelism(4);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[]{".", "_"});
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               HadoopMapredCompatWordCount.main(new String[] { textPath, 
resultPath });
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
new file mode 100644
index 0000000..13d971c
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.hamcrest.core.IsEqual;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBase {
+
+       public HadoopReduceCombineFunctionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testStandardCountingWithCombiner() throws Exception{
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper1());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+                                               new SumReducer(), new 
SumReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               counts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,5)\n"+
+                               "(1,6)\n" +
+                               "(2,6)\n" +
+                               "(3,4)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testUngroupedHadoopReducer() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper2());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
+                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+                                               new SumReducer(), new 
SumReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               sum.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,231)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testCombiner() throws Exception {
+               org.junit.Assume.assumeThat(mode, new 
IsEqual<TestExecutionMode>(TestExecutionMode.CLUSTER));
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, IntWritable>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper3());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+                                               new SumReducer(), new 
KeyChangingReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               counts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,5)\n"+
+                               "(1,6)\n" +
+                               "(2,5)\n" +
+                               "(3,5)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testConfigurationViaJobConf() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               JobConf conf = new JobConf();
+               conf.set("my.cntPrefix", "Hello");
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper4());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+                                               new ConfigurableCntReducer(), 
conf));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               hellos.writeAsText(resultPath);
+               env.execute();
+
+               // return expected result
+               String expected = "(0,0)\n"+
+                               "(1,0)\n" +
+                               "(2,1)\n" +
+                               "(3,1)\n" +
+                               "(4,1)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+       
+       public static class SumReducer implements Reducer<IntWritable, 
IntWritable, IntWritable, IntWritable> {
+
+               @Override
+               public void reduce(IntWritable k, Iterator<IntWritable> v, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       
+                       int sum = 0;
+                       while(v.hasNext()) {
+                               sum += v.next().get();
+                       }
+                       out.collect(k, new IntWritable(sum));
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class KeyChangingReducer implements Reducer<IntWritable, 
IntWritable, IntWritable, IntWritable> {
+
+               @Override
+               public void reduce(IntWritable k, Iterator<IntWritable> v, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       while(v.hasNext()) {
+                               out.collect(new IntWritable(k.get() % 4), 
v.next());
+                       }
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class ConfigurableCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               private String countPrefix;
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith(this.countPrefix)) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf c) { 
+                       this.countPrefix = c.get("my.cntPrefix");
+               }
+
+               @Override
+               public void close() throws IOException { }
+       }
+
+       public static class Mapper1 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable,
+                       IntWritable>> {
+               private static final long serialVersionUID = 1L;
+               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
+               @Override
+               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
+               throws Exception {
+                       outT.f0 = new IntWritable(v.f0.get() / 6);
+                       outT.f1 = new IntWritable(1);
+                       return outT;
+               }
+       }
+
+       public static class Mapper2 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable,
+                       IntWritable>> {
+               private static final long serialVersionUID = 1L;
+               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
+               @Override
+               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
+               throws Exception {
+                       outT.f0 = new IntWritable(0);
+                       outT.f1 = v.f0;
+                       return outT;
+               }
+       }
+
+       public static class Mapper3 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, IntWritable>> {
+               private static final long serialVersionUID = 1L;
+               Tuple2<IntWritable,IntWritable> outT = new 
Tuple2<IntWritable,IntWritable>();
+               @Override
+               public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, 
Text> v)
+               throws Exception {
+                       outT.f0 = v.f0;
+                       outT.f1 = new IntWritable(1);
+                       return outT;
+               }
+       }
+
+       public static class Mapper4 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() % 5);
+                       return v;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
new file mode 100644
index 0000000..abc0e9c
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
+
+       public HadoopReduceFunctionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testStandardGrouping() throws Exception{
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper1());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
CommentCntReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               commentCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,0)\n"+
+                               "(1,3)\n" +
+                               "(2,5)\n" +
+                               "(3,5)\n" +
+                               "(4,2)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testUngroupedHadoopReducer() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+
+               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
AllCommentCntReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               commentCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(42,15)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testConfigurationViaJobConf() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               JobConf conf = new JobConf();
+               conf.set("my.cntPrefix", "Hello");
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper2());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+                                               new ConfigurableCntReducer(), 
conf));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               helloCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,0)\n"+
+                               "(1,0)\n" +
+                               "(2,1)\n" +
+                               "(3,1)\n" +
+                               "(4,1)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+       
+       public static class CommentCntReducer implements Reducer<IntWritable, 
Text, IntWritable, IntWritable> {
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith("Comment")) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class AllCommentCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith("Comment")) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(new IntWritable(42), new 
IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class ConfigurableCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               private String countPrefix;
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith(this.countPrefix)) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf c) { 
+                       this.countPrefix = c.get("my.cntPrefix");
+               }
+
+               @Override
+               public void close() throws IOException { }
+       }
+
+       public static class Mapper1 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() / 5);
+                       return v;
+               }
+       }
+
+       public static class Mapper2 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() % 5);
+                       return v;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
new file mode 100644
index 0000000..eed6f8f
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+public class HadoopTestData {
+
+       public static DataSet<Tuple2<IntWritable, Text>> 
getKVPairDataSet(ExecutionEnvironment env) {
+               
+               List<Tuple2<IntWritable, Text>> data = new 
ArrayList<Tuple2<IntWritable, Text>>();
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new 
Text("Hi")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new 
Text("Hello")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new 
Text("Hello world")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new 
Text("Hello world, how are you?")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new 
Text("I am fine.")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new 
Text("Luke Skywalker")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new 
Text("Comment#1")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new 
Text("Comment#2")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new 
Text("Comment#3")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new 
Text("Comment#4")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new 
Text("Comment#5")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new 
Text("Comment#6")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new 
Text("Comment#7")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new 
Text("Comment#8")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new 
Text("Comment#9")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new 
Text("Comment#10")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new 
Text("Comment#11")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new 
Text("Comment#12")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new 
Text("Comment#13")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new 
Text("Comment#14")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new 
Text("Comment#15")));
+               
+               Collections.shuffle(data);
+               
+               return env.fromCollection(data);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
new file mode 100644
index 0000000..524318c
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
+
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HadoopTupleUnwrappingIteratorTest {
+
+       @Test
+       public void testValueIterator() {
+               
+               HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
+                               new HadoopTupleUnwrappingIterator<IntWritable, 
IntWritable>(new WritableSerializer
+                                               
<IntWritable>(IntWritable.class));
+               
+               // many values
+               
+               ArrayList<Tuple2<IntWritable, IntWritable>> tList = new 
ArrayList<Tuple2<IntWritable, IntWritable>>();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(1)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(2)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(3)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(4)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(5)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(6)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(7)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(8)));
+               
+               int expectedKey = 1;
+               int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // one value
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(2),new IntWritable(10)));
+               
+               expectedKey = 2;
+               expectedValues = new int[]{10};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // more values
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(10)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(4)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(7)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(9)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(21)));
+               
+               expectedKey = 3;
+               expectedValues = new int[]{10,4,7,9,21};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // no has next calls
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(5)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(8)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(42)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(-1)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(0)));
+               
+               expectedKey = 4;
+               expectedValues = new int[]{5,8,42,-1,0};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+               }
+               try {
+                       valIt.next();
+                       Assert.fail();
+               } catch (NoSuchElementException nsee) {
+                       // expected
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
new file mode 100644
index 0000000..9b4aeea
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopInputOutputITCase extends JavaProgramTestBase {
+       
+       protected String textPath;
+       protected String resultPath;
+       
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+               this.setParallelism(4);
+       }
+       
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[]{".", "_"});
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               WordCount.main(new String[] { textPath, resultPath });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/pom.xml 
b/flink-batch-connectors/flink-hbase/pom.xml
new file mode 100644
index 0000000..ba4cf85
--- /dev/null
+++ b/flink-batch-connectors/flink-hbase/pom.xml
@@ -0,0 +1,220 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-batch-connectors</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-hbase</artifactId>
+       <name>flink-hbase</name>
+       <packaging>jar</packaging>
+
+       <properties>
+               <hbase.hadoop1.version>0.98.11-hadoop1</hbase.hadoop1.version>
+               <hbase.hadoop2.version>0.98.11-hadoop2</hbase.hadoop2.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>${shading-artifact.name}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-core</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-hadoop-compatibility</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.flink</groupId>
+                                       
<artifactId>flink-shaded-include-yarn</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- HBase server needed for TableOutputFormat -->
+               <!-- TODO implement bulk output format for HBase -->
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-server</artifactId>
+                       <version>${hbase.version}</version>
+                       <exclusions>
+                               <!-- Remove unneeded dependency, which is 
conflicting with our jetty-util version. -->
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty-util</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty-sslengine</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jsp-2.1</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jsp-api-2.1</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>servlet-api-2.5</artifactId>
+                               </exclusion>
+                               <!-- The hadoop dependencies are handled 
through flink-shaded-hadoop -->
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-common</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-auth</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       
<artifactId>hadoop-annotations</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       
<artifactId>hadoop-mapreduce-client-core</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-client</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-hdfs</artifactId>
+                               </exclusion>
+                               <!-- Bug in hbase annotations, can be removed 
when fixed. See FLINK-2153. -->
+                               <exclusion>
+                                       <groupId>org.apache.hbase</groupId>
+                                       
<artifactId>hbase-annotations</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+       </dependencies>
+
+       <profiles>
+               <profile>
+                       <id>hadoop-1</id>
+                       <activation>
+                               <property>
+                                       <!-- Please do not remove the 'hadoop1' 
comment. See ./tools/generate_specific_pom.sh -->
+                                       
<!--hadoop1--><name>hadoop.profile</name><value>1</value>
+                               </property>
+                       </activation>
+                       <properties>
+                               
<hbase.version>${hbase.hadoop1.version}</hbase.version>
+                       </properties>
+               </profile>
+               
+               <profile>
+                       <id>hadoop-2</id>
+                       <repositories>
+                               <repository>
+                                       <id>hadoop-2-repo2</id>
+                                       
<url>https://repo.maven.apache.org/maven2</url>
+                                       <releases>
+                                               <enabled>true</enabled>
+                                       </releases>
+                                       <snapshots>
+                                               <enabled>false</enabled>
+                                       </snapshots>
+                               </repository>
+                       </repositories>
+                       <activation>
+                               <property>
+                                       <!-- Please do not remove the 'hadoop2' 
comment. See ./tools/generate_specific_pom.sh -->
+                                       
<!--hadoop2--><name>!hadoop.profile</name>
+                               </property>
+                       </activation>
+                       <properties>
+                               
<hbase.version>${hbase.hadoop2.version}</hbase.version>
+                       </properties>
+               </profile>
+
+               <profile>
+                       <id>cdh5.1.3</id>
+                       <properties>
+                               <hadoop.profile>2</hadoop.profile>
+                               <hbase.version>0.98.1-cdh5.1.3</hbase.version>
+                               <hadoop.version>2.3.0-cdh5.1.3</hadoop.version>
+                               <!-- Cloudera use different versions for hadoop 
core and commons-->
+                               <!-- This profile could be removed if Cloudera 
fix this mismatch! -->
+                               
<hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version>
+                       </properties>
+                       <dependencyManagement>
+                               <dependencies>
+                                       <dependency>
+                                               
<groupId>org.apache.hadoop</groupId>
+                                               
<artifactId>hadoop-core</artifactId>
+                                               
<version>${hadoop.core.version}</version>
+                                       </dependency>
+                               </dependencies>
+                       </dependencyManagement>
+               </profile>
+
+       </profiles>
+
+</project>

Reply via email to