http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
new file mode 100644
index 0000000..3a95d94
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+
+public class WritableComparator<T extends Writable & Comparable<T>> extends 
TypeComparator<T> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private Class<T> type;
+       
+       private final boolean ascendingComparison;
+       
+       private transient T reference;
+       
+       private transient T tempReference;
+       
+       private transient Kryo kryo;
+
+       @SuppressWarnings("rawtypes")
+       private final TypeComparator[] comparators = new TypeComparator[] 
{this};
+
+       public WritableComparator(boolean ascending, Class<T> type) {
+               this.type = type;
+               this.ascendingComparison = ascending;
+       }
+       
+       @Override
+       public int hash(T record) {
+               return record.hashCode();
+       }
+       
+       @Override
+       public void setReference(T toCompare) {
+               checkKryoInitialized();
+
+               reference = KryoUtils.copy(toCompare, kryo, new 
WritableSerializer<T>(type));
+       }
+       
+       @Override
+       public boolean equalToReference(T candidate) {
+               return candidate.equals(reference);
+       }
+       
+       @Override
+       public int compareToReference(TypeComparator<T> referencedComparator) {
+               T otherRef = ((WritableComparator<T>) 
referencedComparator).reference;
+               int comp = otherRef.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public int compare(T first, T second) {
+               int comp = first.compareTo(second);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               ensureReferenceInstantiated();
+               ensureTempReferenceInstantiated();
+               
+               reference.readFields(firstSource);
+               tempReference.readFields(secondSource);
+               
+               int comp = reference.compareTo(tempReference);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public boolean supportsNormalizedKey() {
+               return NormalizableKey.class.isAssignableFrom(type);
+       }
+       
+       @Override
+       public int getNormalizeKeyLen() {
+               ensureReferenceInstantiated();
+               
+               NormalizableKey<?> key = (NormalizableKey<?>) reference;
+               return key.getMaxNormalizedKeyLen();
+       }
+       
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+       
+       @Override
+       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
+               NormalizableKey<?> key = (NormalizableKey<?>) record;
+               key.copyNormalizedKey(target, offset, numBytes);
+       }
+       
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+       
+       @Override
+       public TypeComparator<T> duplicate() {
+               return new WritableComparator<T>(ascendingComparison, type);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public TypeComparator[] getFlatComparators() {
+               return comparators;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+       
+       @Override
+       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+       
+       @Override
+       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new Kryo();
+
+                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+                       this.kryo.setAsmEnabled(true);
+                       this.kryo.register(type);
+               }
+       }
+       
+       private void ensureReferenceInstantiated() {
+               if (reference == null) {
+                       reference = InstantiationUtil.instantiate(type, 
Writable.class);
+               }
+       }
+       
+       private void ensureTempReferenceInstantiated() {
+               if (tempReference == null) {
+                       tempReference = InstantiationUtil.instantiate(type, 
Writable.class);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
new file mode 100644
index 0000000..9036d75
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+
+public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private final Class<T> typeClass;
+       
+       private transient Kryo kryo;
+       
+       private transient T copyInstance;
+       
+       public WritableSerializer(Class<T> typeClass) {
+               this.typeClass = typeClass;
+       }
+       
+       @SuppressWarnings("unchecked")
+       @Override
+       public T createInstance() {
+               if(typeClass == NullWritable.class) {
+                       return (T) NullWritable.get();
+               }
+               return InstantiationUtil.instantiate(typeClass);
+       }
+
+
+       
+       @Override
+       public T copy(T from) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, kryo, this);
+       }
+       
+       @Override
+       public T copy(T from, T reuse) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, reuse, kryo, this);
+       }
+       
+       @Override
+       public int getLength() {
+               return -1;
+       }
+       
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               record.write(target);
+       }
+       
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               return deserialize(createInstance(), source);
+       }
+       
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               reuse.readFields(source);
+               return reuse;
+       }
+       
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               ensureInstanceInstantiated();
+               copyInstance.readFields(source);
+               copyInstance.write(target);
+       }
+       
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+       
+       @Override
+       public WritableSerializer<T> duplicate() {
+               return new WritableSerializer<T>(typeClass);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void ensureInstanceInstantiated() {
+               if (copyInstance == null) {
+                       copyInstance = createInstance();
+               }
+       }
+       
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new Kryo();
+
+                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+                       this.kryo.setAsmEnabled(true);
+                       this.kryo.register(typeClass);
+               }
+       }
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public int hashCode() {
+               return this.typeClass.hashCode();
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof WritableSerializer) {
+                       WritableSerializer<?> other = (WritableSerializer<?>) 
obj;
+
+                       return other.canEqual(this) && typeClass == 
other.typeClass;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof WritableSerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
new file mode 100644
index 0000000..9e8a3e4
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with 
Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop {@link 
org.apache.hadoop.mapred.InputFormat}
+ * and {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the 
first field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key 
and the second field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
+ *
+ */
+
+public final class HadoopInputs {
+       // ----------------------------------- Hadoop Input Format 
---------------------------------------
+
+       /**
+        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapred.FileInputFormat}.
+        *
+        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+        */
+       public static <K,V> HadoopInputFormat<K, V> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath, JobConf job) {
+               // set input path in JobConf
+               org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(inputPath));
+               // return wrapping InputFormat
+               return createHadoopInput(mapredInputFormat, key, value, job);
+       }
+
+       /**
+        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapred.FileInputFormat}.
+        *
+        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+        */
+       public static <K,V> HadoopInputFormat<K, V> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath) {
+               return readHadoopFile(mapredInputFormat, key, value, inputPath, 
new JobConf());
+       }
+
+       /**
+        * Creates a Flink {@link InputFormat} to read a Hadoop sequence file 
for the given key and value classes.
+        *
+        * @return A Flink InputFormat that wraps a Hadoop 
SequenceFileInputFormat.
+        */
+       public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> 
key, Class<V> value, String inputPath) throws IOException {
+               return readHadoopFile(new 
org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, 
inputPath);
+       }
+
+       /**
+        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapred.InputFormat}.
+        *
+        * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+        */
+       public static <K,V> HadoopInputFormat<K, V> 
createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, JobConf job) {
+               return new HadoopInputFormat<>(mapredInputFormat, key, value, 
job);
+       }
+
+       /**
+        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+        *
+        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+        */
+       public static <K,V> 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> 
readHadoopFile(
+                       
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) 
throws IOException
+       {
+               // set input path in Job
+               
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(inputPath));
+               // return wrapping InputFormat
+               return createHadoopInput(mapreduceInputFormat, key, value, job);
+       }
+
+       /**
+        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+        *
+        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+        */
+       public static <K,V> 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> 
readHadoopFile(
+                       
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws 
IOException
+       {
+               return readHadoopFile(mapreduceInputFormat, key, value, 
inputPath, Job.getInstance());
+       }
+
+       /**
+        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapreduce.InputFormat}.
+        *
+        * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+        */
+       public static <K,V> 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> 
createHadoopInput(
+                       org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
+       {
+               return new 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat,
 key, value, job);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
new file mode 100644
index 0000000..97ca329
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.commons.cli.Option;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to work with Apache Hadoop libraries.
+ */
+public class HadoopUtils {
+       /**
+        * Returns {@link ParameterTool} for the arguments parsed by {@link 
GenericOptionsParser}
+        *
+        * @param args Input array arguments. It should be parsable by {@link 
GenericOptionsParser}
+        * @return A {@link ParameterTool}
+        * @throws IOException If arguments cannot be parsed by {@link 
GenericOptionsParser}
+        * @see GenericOptionsParser
+        */
+       public static ParameterTool paramsFromGenericOptionsParser(String[] 
args) throws IOException {
+               Option[] options = new 
GenericOptionsParser(args).getCommandLine().getOptions();
+               Map<String, String> map = new HashMap<String, String>();
+               for (Option option : options) {
+                       String[] split = option.getValue().split("=");
+                       map.put(split[0], split[1]);
+               }
+               return ParameterTool.fromMap(map);
+       }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
new file mode 100644
index 0000000..ba8aa90
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+                                       extends 
RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
+                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
+       private transient JobConf jobConf;
+
+       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
outputCollector;
+       private transient Reporter reporter;
+       
+       /**
+        * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+        * 
+        * @param hadoopMapper The Hadoop Mapper to wrap.
+        */
+       public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopMapper) {
+               this(hadoopMapper, new JobConf());
+       }
+       
+       /**
+        * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+        * The Hadoop Mapper is configured with the provided JobConf.
+        * 
+        * @param hadoopMapper The Hadoop Mapper to wrap.
+        * @param conf The JobConf that is used to configure the Hadoop Mapper.
+        */
+       public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopMapper, JobConf conf) {
+               if(hadoopMapper == null) {
+                       throw new NullPointerException("Mapper may not be 
null.");
+               }
+               if(conf == null) {
+                       throw new NullPointerException("JobConf may not be 
null.");
+               }
+               
+               this.mapper = hadoopMapper;
+               this.jobConf = conf;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.mapper.configure(jobConf);
+               
+               this.reporter = new HadoopDummyReporter();
+               this.outputCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
+       }
+
+       @Override
+       public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
+                       throws Exception {
+               outputCollector.setFlinkCollector(out);
+               mapper.map(value.f0, value.f1, outputCollector, reporter);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {     
+               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
+               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, 
mapper.getClass(), 3);
+               
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
+       }
+       
+       /**
+        * Custom serialization methods.
+        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+        */
+       private void writeObject(final ObjectOutputStream out) throws 
IOException {
+               out.writeObject(mapper.getClass());
+               jobConf.write(out);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
+                               
(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+               mapper = InstantiationUtil.instantiate(mapperClass);
+               
+               jobConf = new JobConf();
+               jobConf.readFields(in);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
new file mode 100644
index 0000000..c1acc2b
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a 
combinable Flink GroupReduceFunction.
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> 
+       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
+       implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, 
Tuple2<KEYIN,VALUEIN>>,
+                               ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, 
Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+       private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+       private transient JobConf jobConf;
+       
+       private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> 
valueIterator;
+       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
reduceCollector;
+       private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+       private transient Reporter reporter;
+
+       /**
+        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
+        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
+        */
+       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
+                                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+               this(hadoopReducer, hadoopCombiner, new JobConf());
+       }
+       
+       /**
+        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
+        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
+        * @param conf The JobConf that is used to configure both Hadoop 
Reducers.
+        */
+       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
+                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
+               if(hadoopReducer == null) {
+                       throw new NullPointerException("Reducer may not be 
null.");
+               }
+               if(hadoopCombiner == null) {
+                       throw new NullPointerException("Combiner may not be 
null.");
+               }
+               if(conf == null) {
+                       throw new NullPointerException("JobConf may not be 
null.");
+               }
+               
+               this.reducer = hadoopReducer;
+               this.combiner = hadoopCombiner;
+               this.jobConf = conf;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.reducer.configure(jobConf);
+               this.combiner.configure(jobConf);
+               
+               this.reporter = new HadoopDummyReporter();
+               Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+               this.valueIterator = new 
HadoopTupleUnwrappingIterator<>(keySerializer);
+               this.combineCollector = new HadoopOutputCollector<>();
+               this.reduceCollector = new HadoopOutputCollector<>();
+       }
+
+       @Override
+       public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+                       throws Exception {
+               reduceCollector.setFlinkCollector(out);
+               valueIterator.set(values.iterator());
+               reducer.reduce(valueIterator.getCurrentKey(), valueIterator, 
reduceCollector, reporter);
+       }
+
+       @Override
+       public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+               combineCollector.setFlinkCollector(out);
+               valueIterator.set(values.iterator());
+               combiner.reduce(valueIterator.getCurrentKey(), valueIterator, 
combineCollector, reporter);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
+
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass(outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass(outValClass);
+               return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
+       }
+
+       /**
+        * Custom serialization methods.
+        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+        */
+       private void writeObject(final ObjectOutputStream out) throws 
IOException {
+               
+               out.writeObject(reducer.getClass());
+               out.writeObject(combiner.getClass());
+               jobConf.write(out);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               
+               Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+                               
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+               reducer = InstantiationUtil.instantiate(reducerClass);
+               
+               Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
+                               
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+               combiner = InstantiationUtil.instantiate(combinerClass);
+               
+               jobConf = new JobConf();
+               jobConf.readFields(in);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
new file mode 100644
index 0000000..55aea24
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction. 
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+                                       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+       private transient JobConf jobConf;
+       
+       private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> 
valueIterator;
+       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
reduceCollector;
+       private transient Reporter reporter;
+       
+       /**
+        * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer to wrap.
+        */
+       public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopReducer) {
+               this(hadoopReducer, new JobConf());
+       }
+       
+       /**
+        * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction.
+        * 
+        * @param hadoopReducer The Hadoop Reducer to wrap.
+        * @param conf The JobConf that is used to configure the Hadoop Reducer.
+        */
+       public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopReducer, JobConf conf) {
+               if(hadoopReducer == null) {
+                       throw new NullPointerException("Reducer may not be 
null.");
+               }
+               if(conf == null) {
+                       throw new NullPointerException("JobConf may not be 
null.");
+               }
+               
+               this.reducer = hadoopReducer;
+               this.jobConf = conf;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.reducer.configure(jobConf);
+               
+               this.reporter = new HadoopDummyReporter();
+               this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
+               Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
+       }
+
+       @Override
+       public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+                       throws Exception {
+               
+               reduceCollector.setFlinkCollector(out);
+               valueIterator.set(values.iterator());
+               reducer.reduce(valueIterator.getCurrentKey(), valueIterator, 
reduceCollector, reporter);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
+
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
+       }
+
+       /**
+        * Custom serialization methods
+        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+        */
+       private void writeObject(final ObjectOutputStream out) throws 
IOException {
+               
+               out.writeObject(reducer.getClass());
+               jobConf.write(out);             
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               
+               Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+                               
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+               reducer = InstantiationUtil.instantiate(reducerClass);
+               
+               jobConf = new JobConf();
+               jobConf.readFields(in);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
new file mode 100644
index 0000000..bfe03d3
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+
+/**
+ * A Hadoop OutputCollector that wraps a Flink OutputCollector.
+ * On each call of collect() the data is forwarded to the wrapped Flink 
collector.
+ */
+public final class HadoopOutputCollector<KEY,VALUE> implements 
OutputCollector<KEY,VALUE> {
+
+       private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+
+       private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+
+       /**
+        * Set the wrapped Flink collector.
+        * 
+        * @param flinkCollector The wrapped Flink OutputCollector.
+        */
+       public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> 
flinkCollector) {
+               this.flinkCollector = flinkCollector;
+       }
+       
+       /**
+        * Use the wrapped Flink collector to collect a key-value pair for 
Flink. 
+        * 
+        * @param key the key to collect
+        * @param val the value to collect
+        * @throws IOException unexpected of key or value in key-value pair.
+        */
+       @Override
+       public void collect(final KEY key, final VALUE val) throws IOException {
+               this.outTuple.f0 = key;
+               this.outTuple.f1 = val;
+               this.flinkCollector.collect(outTuple);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
new file mode 100644
index 0000000..2d204b8
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the 
second (value) field.
+ */
+public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
+               extends TupleUnwrappingIterator<VALUE, KEY> implements 
java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final TypeSerializer<KEY> keySerializer;
+
+       private transient Iterator<Tuple2<KEY,VALUE>> iterator;
+       
+       private transient KEY curKey;
+       private transient VALUE firstValue;
+       private transient boolean atFirst;
+
+       public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) 
{
+               this.keySerializer = checkNotNull(keySerializer);
+       }
+       
+       /**
+        * Set the Flink iterator to wrap.
+        * 
+        * @param iterator The Flink iterator to wrap.
+        */
+       @Override
+       public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+               this.iterator = iterator;
+               if(this.hasNext()) {
+                       final Tuple2<KEY, VALUE> tuple = iterator.next();
+                       this.curKey = keySerializer.copy(tuple.f0);
+                       this.firstValue = tuple.f1;
+                       this.atFirst = true;
+               } else {
+                       this.atFirst = false;
+               }
+       }
+       
+       @Override
+       public boolean hasNext() {
+               if(this.atFirst) {
+                       return true;
+               }
+               return iterator.hasNext();
+       }
+       
+       @Override
+       public VALUE next() {
+               if(this.atFirst) {
+                       this.atFirst = false;
+                       return firstValue;
+               }
+               
+               final Tuple2<KEY, VALUE> tuple = iterator.next();
+               return tuple.f1;
+       }
+       
+       public KEY getCurrentKey() {
+               return this.curKey;
+       }
+       
+       @Override
+       public void remove() {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
 
b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
new file mode 100644
index 0000000..133a5f4
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hadoopcompatibility.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.hadoop.mapreduce
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.hadoop.fs.{Path => HadoopPath}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat => 
MapredFileInputFormat, InputFormat => MapredInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
+
+/**
+  * HadoopInputs is a utility class to use Apache Hadoop InputFormats with 
Apache Flink.
+  *
+  * It provides methods to create Flink InputFormat wrappers for Hadoop
+  * [[org.apache.hadoop.mapred.InputFormat]] and 
[[org.apache.hadoop.mapreduce.InputFormat]].
+  *
+  * Key value pairs produced by the Hadoop InputFormats are converted into 
[[Tuple2]] where
+  * the first field is the key and the second field is the value.
+  *
+  */
+object HadoopInputs {
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
+
+    // set input path in JobConf
+    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapredInputFormat
+    createHadoopInput(mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
reads a Hadoop sequence
+    * file with the given key and value classes.
+    */
+  def readSequenceFile[K, V](
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(
+      new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+      key,
+      value,
+      inputPath
+    )
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapredInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
+
+    new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): 
mapreduce.HadoopInputFormat[K, V] = {
+
+    // set input path in Job
+    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapreduceInputFormat
+    createHadoopInput(mapreduceInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): 
mapreduce.HadoopInputFormat[K, V] =
+  {
+    readHadoopFile(mapreduceInputFormat, key, value, inputPath, 
Job.getInstance)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapreduceInputFormat: MapreduceInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): 
mapreduce.HadoopInputFormat[K, V] = {
+
+    new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, 
job)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
new file mode 100644
index 0000000..2aefd9f
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class WritableExtractionTest {
+
+       @Test
+       public void testDetectWritable() {
+               // writable interface itself must not be writable
+               assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
+
+               // various forms of extension
+               
assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
+               
assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
+               
assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
+
+               // some non-writables
+               assertFalse(TypeExtractor.isHadoopWritable(String.class));
+               assertFalse(TypeExtractor.isHadoopWritable(List.class));
+               
assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
+       }
+
+       @Test
+       public void testCreateWritableInfo() {
+               TypeInformation<DirectWritable> info1 =
+                               
TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
+               assertEquals(DirectWritable.class, info1.getTypeClass());
+
+               TypeInformation<ViaInterfaceExtension> info2 =
+                               
TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
+               assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
+
+               TypeInformation<ViaAbstractClassExtension> info3 = 
+                               
TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
+               assertEquals(ViaAbstractClassExtension.class, 
info3.getTypeClass());
+       }
+
+       @Test
+       public void testValidateTypeInfo() {
+               // validate unrelated type info
+               
TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class);
+
+               // validate writable type info correctly
+               TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+                               DirectWritable.class), DirectWritable.class);
+               TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+                               ViaInterfaceExtension.class), 
ViaInterfaceExtension.class);
+               TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+                               ViaAbstractClassExtension.class), 
ViaAbstractClassExtension.class);
+
+               // incorrect case: not writable at all
+               try {
+                       TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+                                       DirectWritable.class), String.class);
+                       fail("should have failed with an exception");
+               } catch (InvalidTypesException e) {
+                       // expected
+               }
+
+               // incorrect case: wrong writable
+               try {
+                       TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+                                       ViaInterfaceExtension.class), 
DirectWritable.class);
+                       fail("should have failed with an exception");
+               } catch (InvalidTypesException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testExtractFromFunction() {
+               RichMapFunction<DirectWritable, DirectWritable> function = new 
RichMapFunction<DirectWritable, DirectWritable>() {
+                       @Override
+                       public DirectWritable map(DirectWritable value) throws 
Exception {
+                               return null;
+                       }
+               };
+
+               TypeInformation<DirectWritable> outType = 
+                               TypeExtractor.getMapReturnTypes(function, new 
WritableTypeInfo<>(DirectWritable.class));
+
+               assertTrue(outType instanceof WritableTypeInfo);
+               assertEquals(DirectWritable.class, outType.getTypeClass());
+       }
+
+       @Test
+       public void testExtractAsPartOfPojo() {
+               PojoTypeInfo<PojoWithWritable> pojoInfo = 
+                               (PojoTypeInfo<PojoWithWritable>) 
TypeExtractor.getForClass(PojoWithWritable.class);
+
+               boolean foundWritable = false;
+               for (int i = 0; i < pojoInfo.getArity(); i++) {
+                       PojoField field = pojoInfo.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       
+                       if (name.equals("hadoopCitizen")) {
+                               if (foundWritable) {
+                                       fail("already seen");
+                               }
+                               foundWritable = true;
+                               assertEquals(new 
WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
+                               assertEquals(DirectWritable.class, 
field.getTypeInformation().getTypeClass());
+                               
+                       }
+               }
+               
+               assertTrue("missed the writable type", foundWritable);
+       }
+
+       @Test
+       public void testInputValidationError() {
+
+               RichMapFunction<Writable, String> function = new 
RichMapFunction<Writable, String>() {
+                       @Override
+                       public String map(Writable value) throws Exception {
+                               return null;
+                       }
+               };
+
+               @SuppressWarnings("unchecked")
+               TypeInformation<Writable> inType = 
+                               (TypeInformation<Writable>) 
(TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
+               
+               try {
+                       TypeExtractor.getMapReturnTypes(function, inType);
+                       fail("exception expected");
+               }
+               catch (InvalidTypesException e) {
+                       // right
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test type classes
+       // 
------------------------------------------------------------------------
+
+       public interface ExtendedWritable extends Writable {}
+
+       public static abstract class AbstractWritable implements Writable {}
+
+       public static class DirectWritable implements Writable {
+
+               @Override
+               public void write(DataOutput dataOutput) throws IOException {}
+
+               @Override
+               public void readFields(DataInput dataInput) throws IOException 
{}
+       }
+
+       public static class ViaInterfaceExtension implements ExtendedWritable {
+
+               @Override
+               public void write(DataOutput dataOutput) throws IOException {}
+
+               @Override
+               public void readFields(DataInput dataInput) throws IOException 
{}
+       }
+
+       public static class ViaAbstractClassExtension extends AbstractWritable {
+
+               @Override
+               public void write(DataOutput dataOutput) throws IOException {}
+
+               @Override
+               public void readFields(DataInput dataInput) throws IOException 
{}
+       }
+
+       public static class PojoWithWritable {
+               public String str;
+               public DirectWritable hadoopCitizen;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
new file mode 100644
index 0000000..3d2b652
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.io.Writable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class WritableInfoParserTest {
+
+       @Test
+       public void testWritableType() {
+               TypeInformation<?> ti = TypeInfoParser.parse(
+                               
"Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
+
+               Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
+               Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) 
ti).getTypeClass());
+       }
+
+       @Test
+       public void testPojoWithWritableType() {
+               TypeInformation<?> ti = TypeInfoParser.parse(
+                               
"org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
+                               + "basic=Integer,"
+                               + "tuple=Tuple2<String, Integer>,"
+                               + 
"hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
+                               + "array=String[]"
+                               + ">");
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+               Assert.assertEquals("array", 
pti.getPojoFieldAt(0).getField().getName());
+               Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() 
instanceof BasicArrayTypeInfo);
+               Assert.assertEquals("basic", 
pti.getPojoFieldAt(1).getField().getName());
+               Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() 
instanceof BasicTypeInfo);
+               Assert.assertEquals("hadoopCitizen", 
pti.getPojoFieldAt(2).getField().getName());
+               Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() 
instanceof WritableTypeInfo);
+               Assert.assertEquals("tuple", 
pti.getPojoFieldAt(3).getField().getName());
+               Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() 
instanceof TupleTypeInfo);
+       }
+       // 
------------------------------------------------------------------------
+       //  Test types
+       // 
------------------------------------------------------------------------
+
+       public static class MyWritable implements Writable {
+
+               @Override
+               public void write(DataOutput out) throws IOException {}
+
+               @Override
+               public void readFields(DataInput in) throws IOException {}
+       }
+
+       public static class MyPojo {
+               public Integer basic;
+               public Tuple2<String, Integer> tuple;
+               public MyWritable hadoopCitizen;
+               public String[] array;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
new file mode 100644
index 0000000..eb9cdf0
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class WritableTypeInfoTest extends TestLogger {
+       
+       @Test
+       public void testWritableTypeInfoEquality() {
+               WritableTypeInfo<TestClass> tpeInfo1 = new 
WritableTypeInfo<>(TestClass.class);
+               WritableTypeInfo<TestClass> tpeInfo2 = new 
WritableTypeInfo<>(TestClass.class);
+
+               assertEquals(tpeInfo1, tpeInfo2);
+               assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+       }
+
+       @Test
+       public void testWritableTypeInfoInequality() {
+               WritableTypeInfo<TestClass> tpeInfo1 = new 
WritableTypeInfo<>(TestClass.class);
+               WritableTypeInfo<AlternateClass> tpeInfo2 = new 
WritableTypeInfo<>(AlternateClass.class);
+
+               assertNotEquals(tpeInfo1, tpeInfo2);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test types
+       // 
------------------------------------------------------------------------
+
+       public static class TestClass implements Writable {
+
+               @Override
+               public void write(DataOutput dataOutput) throws IOException {}
+
+               @Override
+               public void readFields(DataInput dataInput) throws IOException 
{}
+       }
+
+       public static class AlternateClass implements Writable {
+
+               @Override
+               public void write(DataOutput dataOutput) throws IOException {}
+
+               @Override
+               public void readFields(DataInput dataInput) throws IOException 
{}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
new file mode 100644
index 0000000..c32f5da
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class StringArrayWritable implements Writable, 
Comparable<StringArrayWritable> {
+       
+       private String[] array = new String[0];
+       
+       public StringArrayWritable() {
+               super();
+       }
+       
+       public StringArrayWritable(String[] array) {
+               this.array = array;
+       }
+       
+       @Override
+       public void write(DataOutput out) throws IOException {
+               out.writeInt(this.array.length);
+               
+               for(String str : this.array) {
+                       byte[] b = str.getBytes();
+                       out.writeInt(b.length);
+                       out.write(b);
+               }
+       }
+       
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               this.array = new String[in.readInt()];
+               
+               for(int i = 0; i < this.array.length; i++) {
+                       byte[] b = new byte[in.readInt()];
+                       in.readFully(b);
+                       this.array[i] = new String(b);
+               }
+       }
+       
+       @Override
+       public int compareTo(StringArrayWritable o) {
+               if(this.array.length != o.array.length) {
+                       return this.array.length - o.array.length;
+               }
+               
+               for(int i = 0; i < this.array.length; i++) {
+                       int comp = this.array[i].compareTo(o.array[i]);
+                       if(comp != 0) {
+                               return comp;
+                       }
+               }
+               return 0;
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if(!(obj instanceof StringArrayWritable)) {
+                       return false;
+               }
+               return this.compareTo((StringArrayWritable) obj) == 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
new file mode 100644
index 0000000..96f844c
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class WritableComparatorTest extends 
ComparatorTestBase<StringArrayWritable> {
+       
+       StringArrayWritable[] data = new StringArrayWritable[]{
+                       new StringArrayWritable(new String[]{}),
+                       new StringArrayWritable(new String[]{""}),
+                       new StringArrayWritable(new String[]{"a","a"}),
+                       new StringArrayWritable(new String[]{"a","b"}),
+                       new StringArrayWritable(new String[]{"c","c"}),
+                       new StringArrayWritable(new String[]{"d","f"}),
+                       new StringArrayWritable(new String[]{"d","m"}),
+                       new StringArrayWritable(new String[]{"z","x"}),
+                       new StringArrayWritable(new String[]{"a","a", "a"})
+       };
+       
+       @Override
+       protected TypeComparator<StringArrayWritable> createComparator(boolean 
ascending) {
+               return new WritableComparator<StringArrayWritable>(ascending, 
StringArrayWritable.class);
+       }
+       
+       @Override
+       protected TypeSerializer<StringArrayWritable> createSerializer() {
+               return new 
WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
+       }
+       
+       @Override
+       protected StringArrayWritable[] getSortedTestData() {
+               return data;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
new file mode 100644
index 0000000..94e759d
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> 
{
+       @Override
+       protected TypeComparator<WritableID> createComparator(boolean 
ascending) {
+               return new WritableComparator<>(ascending, WritableID.class);
+       }
+
+       @Override
+       protected TypeSerializer<WritableID> createSerializer() {
+               return new WritableSerializer<>(WritableID.class);
+       }
+
+       @Override
+       protected WritableID[] getSortedTestData() {
+               return new WritableID[] {
+                       new WritableID(new UUID(0, 0)),
+                       new WritableID(new UUID(1, 0)),
+                       new WritableID(new UUID(1, 1))
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
new file mode 100644
index 0000000..4274cf6
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+public class WritableID implements WritableComparable<WritableID> {
+       private UUID uuid;
+
+       public WritableID() {
+               this.uuid = UUID.randomUUID();
+       }
+
+       public WritableID(UUID uuid) {
+               this.uuid = uuid;
+       }
+
+       @Override
+       public int compareTo(WritableID o) {
+               return this.uuid.compareTo(o.uuid);
+       }
+
+       @Override
+       public void write(DataOutput dataOutput) throws IOException {
+               dataOutput.writeLong(uuid.getMostSignificantBits());
+               dataOutput.writeLong(uuid.getLeastSignificantBits());
+       }
+
+       @Override
+       public void readFields(DataInput dataInput) throws IOException {
+               this.uuid = new UUID(dataInput.readLong(), 
dataInput.readLong());
+       }
+
+       @Override
+       public String toString() {
+               return uuid.toString();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               WritableID id = (WritableID) o;
+
+               return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != 
null);
+       }
+
+       @Override
+       public int hashCode() {
+               return uuid != null ? uuid.hashCode() : 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
new file mode 100644
index 0000000..bb5f4d4
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.junit.Test;
+
+public class WritableSerializerTest {
+       
+       @Test
+       public void testStringArrayWritable() {
+               StringArrayWritable[] data = new StringArrayWritable[]{
+                               new StringArrayWritable(new String[]{}),
+                               new StringArrayWritable(new String[]{""}),
+                               new StringArrayWritable(new String[]{"a","a"}),
+                               new StringArrayWritable(new String[]{"a","b"}),
+                               new StringArrayWritable(new String[]{"c","c"}),
+                               new StringArrayWritable(new String[]{"d","f"}),
+                               new StringArrayWritable(new String[]{"d","m"}),
+                               new StringArrayWritable(new String[]{"z","x"}),
+                               new StringArrayWritable(new String[]{"a","a", 
"a"})
+               };
+               
+               WritableTypeInfo<StringArrayWritable> writableTypeInfo = 
(WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
+               WritableSerializer<StringArrayWritable> writableSerializer = 
(WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new 
ExecutionConfig());
+               
+               SerializerTestInstance<StringArrayWritable> testInstance = new 
SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(),
 -1, data);
+               
+               testInstance.testAll();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
new file mode 100644
index 0000000..2af7730
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> 
{
+       @Override
+       protected TypeSerializer<WritableID> createSerializer() {
+               return new WritableSerializer<>(WritableID.class);
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<WritableID> getTypeClass() {
+               return WritableID.class;
+       }
+
+       @Override
+       protected WritableID[] getTestData() {
+               return new WritableID[] {
+                       new WritableID(new UUID(0, 0)),
+                       new WritableID(new UUID(1, 0)),
+                       new WritableID(new UUID(1, 1))
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
new file mode 100644
index 0000000..6f7673b
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.java.utils.AbstractParameterToolTest;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class HadoopUtilsTest extends AbstractParameterToolTest {
+
+       @Test
+       public void testParamsFromGenericOptionsParser() throws IOException {
+               ParameterTool parameter = 
HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", 
"-DexpectedCount=15"});
+               validate(parameter);
+       }
+}

Reply via email to