http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
 
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
deleted file mode 100644
index 6ef0f2e..0000000
--- 
a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class HadoopIOFormatsITCase extends JavaProgramTestBase {
-
-       private static int NUM_PROGRAMS = 2;
-
-       private int curProgId = config.getInteger("ProgramId", -1);
-       private String[] resultPath;
-       private String[] expectedResult;
-       private String sequenceFileInPath;
-       private String sequenceFileInPathNull;
-
-       public HadoopIOFormatsITCase(Configuration config) {
-               super(config);  
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = new String[] {getTempDirPath("result0"), 
getTempDirPath("result1") };
-
-               File sequenceFile = createAndRegisterTempFile("seqFile");
-               sequenceFileInPath = sequenceFile.toURI().toString();
-
-               // Create a sequence file
-               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
-               FileSystem fs = 
FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
-               Path path = new Path(sequenceFile.getAbsolutePath());
-
-               //  ------------------ Long / Text Key Value pair: ------------
-               int kvCount = 4;
-
-               LongWritable key = new LongWritable();
-               Text value = new Text();
-               SequenceFile.Writer writer = null;
-               try {
-                       writer = SequenceFile.createWriter( fs, conf, path, 
key.getClass(), value.getClass());
-                       for (int i = 0; i < kvCount; i ++) {
-                               if(i == 1) {
-                                       // write key = 0 a bit more often.
-                                       for(int a = 0;a < 15; a++) {
-                                               key.set(i);
-                                               value.set(i+" - somestring");
-                                               writer.append(key, value);
-                                       }
-                               }
-                               key.set(i);
-                               value.set(i+" - somestring");
-                               writer.append(key, value);
-                       }
-               } finally {
-                       IOUtils.closeStream(writer);
-               }
-
-
-               //  ------------------ Long / Text Key Value pair: ------------
-
-               File sequenceFileNull = 
createAndRegisterTempFile("seqFileNullKey");
-               sequenceFileInPathNull = sequenceFileNull.toURI().toString();
-               path = new Path(sequenceFileInPathNull);
-
-               LongWritable value1 = new LongWritable();
-               SequenceFile.Writer writer1 = null;
-               try {
-                       writer1 = SequenceFile.createWriter( fs, conf, path, 
NullWritable.class, value1.getClass());
-                       for (int i = 0; i < kvCount; i ++) {
-                               value1.set(i);
-                               writer1.append(NullWritable.get(), value1);
-                       }
-               } finally {
-                       IOUtils.closeStream(writer1);
-               }
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, 
resultPath, sequenceFileInPath, sequenceFileInPathNull);
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               for(int i = 0; i < resultPath.length; i++) {
-                       compareResultsByLinesInMemory(expectedResult[i], 
resultPath[i]);
-               }
-       }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
-
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
-
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-               
-               return toParameterList(tConfigs);
-       }
-       
-       public static class HadoopIOFormatPrograms {
-               
-               public static String[] runProgram(int progId, String 
resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws 
Exception {
-                       
-                       switch(progId) {
-                       case 1: {
-                               /**
-                                * Test sequence file, including a key access.
-                                */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                               SequenceFileInputFormat<LongWritable, Text> 
sfif = new SequenceFileInputFormat<LongWritable, Text>();
-                               JobConf hdconf = new JobConf();
-                               SequenceFileInputFormat.addInputPath(hdconf, 
new Path(sequenceFileInPath));
-                               HadoopInputFormat<LongWritable, Text> hif = new 
HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, 
hdconf);
-                               DataSet<Tuple2<LongWritable, Text>> ds = 
env.createInput(hif);
-                               DataSet<Tuple2<Long, Text>> sumed = ds.map(new 
MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
-                                       @Override
-                                       public Tuple2<Long, Text> 
map(Tuple2<LongWritable, Text> value) throws Exception {
-                                               return new Tuple2<Long, 
Text>(value.f0.get(), value.f1);
-                                       }
-                               }).sum(0);
-                               sumed.writeAsText(resultPath[0]);
-                               DataSet<String> res = ds.distinct(0).map(new 
MapFunction<Tuple2<LongWritable, Text>, String>() {
-                                       @Override
-                                       public String map(Tuple2<LongWritable, 
Text> value) throws Exception {
-                                               return value.f1 + " - " + 
value.f0.get();
-                                       }
-                               });
-                               res.writeAsText(resultPath[1]);
-                               env.execute();
-                               
-                               // return expected result
-                               return  new String [] {"(21,3 - somestring)", 
"0 - somestring - 0\n" +
-                                               "1 - somestring - 1\n" +
-                                               "2 - somestring - 2\n" +
-                                               "3 - somestring - 3\n"};
-
-                       }
-                       case 2: {
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                               SequenceFileInputFormat<NullWritable, 
LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
-                               JobConf hdconf = new JobConf();
-                               SequenceFileInputFormat.addInputPath(hdconf, 
new Path(sequenceFileInPathNull));
-                               HadoopInputFormat<NullWritable, LongWritable> 
hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, 
NullWritable.class, LongWritable.class, hdconf);
-                               DataSet<Tuple2<NullWritable, LongWritable>> ds 
= env.createInput(hif);
-                               DataSet<Tuple2<Void, Long>> res = ds.map(new 
MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
-                                       @Override
-                                       public Tuple2<Void, Long> 
map(Tuple2<NullWritable, LongWritable> value) throws Exception {
-                                               return new Tuple2<Void, 
Long>(null, value.f1.get());
-                                       }
-                               });
-                               DataSet<Tuple2<Void, Long>> res1 = 
res.groupBy(1).sum(1);
-                               res1.writeAsText(resultPath[1]);
-                               res.writeAsText(resultPath[0]);
-                               env.execute();
-
-                               // return expected result
-                               return  new String [] {"(null,2)\n" +
-                                               "(null,0)\n" +
-                                               "(null,1)\n" +
-                                               "(null,3)",
-                                               "(null,0)\n" +
-                                               "(null,1)\n" +
-                                               "(null,2)\n" +
-                                               "(null,3)"};
-                       }
-                       default:
-                               throw new IllegalArgumentException("Invalid 
program id");
-                       }
-                       
-               }
-       
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index cc5664b..f345591 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -152,6 +152,12 @@ under the License.
                                <dependency>
                                        <groupId>org.apache.hadoop</groupId>
                                        <artifactId>hadoop-core</artifactId>
+                                       <!--<exclusions>-->
+                                               <!--<exclusion>-->
+                                                       
<!--<groupId>*</groupId>-->
+                                                       
<!--<artifactId>*</artifactId>-->
+                                               <!--</exclusion>-->
+                                       <!--</exclusions>-->
                                </dependency>
                        </dependencies>
                </profile>
@@ -167,6 +173,22 @@ under the License.
                                <dependency>
                                        <groupId>org.apache.hadoop</groupId>
                                        <artifactId>hadoop-common</artifactId>
+                                       <!--<exclusions>-->
+                                               <!--<exclusion>-->
+                                                       
<!--<groupId>*</groupId>-->
+                                                       
<!--<artifactId>*</artifactId>-->
+                                               <!--</exclusion>-->
+                                       <!--</exclusions>-->
+                               </dependency>
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       
<artifactId>hadoop-mapreduce-client-core</artifactId>
+                                       <!--<exclusions>-->
+                                               <!--<exclusion>-->
+                                                       
<!--<groupId>*</groupId>-->
+                                                       
<!--<artifactId>*</artifactId>-->
+                                               <!--</exclusion>-->
+                                       <!--</exclusions>-->
                                </dependency>
                        </dependencies>
                </profile>

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 6415570..81caa2a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1265,8 +1265,7 @@ public abstract class DataSet<T> {
                this.context.registerDataSink(sink);
                return sink;
        }
-       
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Utilities
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 563787f..61a74b9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.io.IteratorInputFormat;
@@ -58,6 +59,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
 import org.apache.flink.util.SplittableIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
  * The ExecutionEnviroment is the context in which a program is executed. A
@@ -400,6 +403,67 @@ public abstract class ExecutionEnvironment {
                
                return new DataSource<X>(this, inputFormat, producedType, 
Utils.getCallLocationName());
        }
+
+       // ----------------------------------- Hadoop Input Format 
---------------------------------------
+
+       /**
+        * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapred.FileInputFormat}. The
+        * given inputName is set on the given job.
+        */
+       public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath, JobConf job) {
+               DataSource<Tuple2<K, V>> result = 
createHadoopInput(mapredInputFormat, key, value, job);
+
+               org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(inputPath));
+
+               return result;
+       }
+
+       /**
+        * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapred.FileInputFormat}. A
+        * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is 
created.
+        */
+       public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath) {
+               return readHadoopFile(mapredInputFormat, key, value, inputPath, 
new JobConf());
+       }
+
+       /**
+        * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapred.InputFormat}.
+        */
+       public <K,V> DataSource<Tuple2<K, V>> 
createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, JobConf job) {
+               HadoopInputFormat<K, V> hadoopInputFormat = new 
HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+
+               return this.createInput(hadoopInputFormat);
+       }
+
+       /**
+        * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
+        * given inputName is set on the given job.
+        */
+       public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) 
throws IOException {
+               DataSource<Tuple2<K, V>> result = 
createHadoopInput(mapredInputFormat, key, value, job);
+
+               
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new 
org.apache
+                               .hadoop.fs.Path(inputPath));
+
+               return result;
+       }
+
+       /**
+        * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
+        * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is 
created.
+        */
+       public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws 
IOException {
+               return readHadoopFile(mapredInputFormat, key, value, inputPath, 
Job.getInstance());
+       }
+
+       /**
+        * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapreduce.InputFormat}.
+        */
+       public <K,V> DataSource<Tuple2<K, V>> 
createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapredInputFormat, Class<K> key, Class<V> value, Job job) {
+               org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, 
V> hadoopInputFormat = new 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, 
V>(mapredInputFormat, key, value, job);
+
+               return this.createInput(hadoopInputFormat);
+       }
        
        // ----------------------------------- Collection 
---------------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
new file mode 100644
index 0000000..8b25249
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, 
Tuple2<K,V>> implements ResultTypeQueryable<Tuple2<K,V>> {
+       
+       public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> 
mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+               super(mapredInputFormat, key, value, job);
+       }
+       
+       @Override
+       public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+               if(!fetched) {
+                       fetchNext();
+               }
+               if(!hasNext) {
+                       return null;
+               }
+               record.f0 = key;
+               record.f1 = value;
+               fetched = false;
+               return record;
+       }
+       
+       @Override
+       public TypeInformation<Tuple2<K,V>> getProducedType() {
+               return new 
TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
new file mode 100644
index 0000000..40f6631
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, 
HadoopInputSplit> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopInputFormatBase.class);
+
+       private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
+       protected Class<K> keyClass;
+       protected Class<V> valueClass;
+       private JobConf jobConf;
+
+       protected transient K key;
+       protected transient V value;
+
+       private transient RecordReader<K, V> recordReader;
+       protected transient boolean fetched = false;
+       protected transient boolean hasNext;
+
+       public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> 
mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+               super();
+               this.mapredInputFormat = mapredInputFormat;
+               this.keyClass = key;
+               this.valueClass = value;
+               HadoopUtils.mergeHadoopConf(job);
+               this.jobConf = job;
+               ReflectionUtils.setConf(mapredInputFormat, jobConf);
+       }
+       
+       public JobConf getJobConf() {
+               return jobConf;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  InputFormat
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void configure(Configuration parameters) {
+               // nothing to do
+       }
+       
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
+               // only gather base statistics for FileInputFormats
+               if(!(mapredInputFormat instanceof FileInputFormat)) {
+                       return null;
+               }
+               
+               final FileBaseStatistics cachedFileStats = (cachedStats != null 
&& cachedStats instanceof FileBaseStatistics) ?
+                               (FileBaseStatistics) cachedStats : null;
+               
+               try {
+                       final org.apache.hadoop.fs.Path[] paths = 
FileInputFormat.getInputPaths(this.jobConf);
+                       
+                       return getFileStats(cachedFileStats, paths, new 
ArrayList<FileStatus>(1));
+               } catch (IOException ioex) {
+                       if (LOG.isWarnEnabled()) {
+                               LOG.warn("Could not determine statistics due to 
an io error: "
+                                               + ioex.getMessage());
+                       }
+               } catch (Throwable t) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Unexpected problem while getting the 
file statistics: "
+                                               + t.getMessage(), t);
+                       }
+               }
+               
+               // no statistics available
+               return null;
+       }
+       
+       @Override
+       public HadoopInputSplit[] createInputSplits(int minNumSplits)
+                       throws IOException {
+               org.apache.hadoop.mapred.InputSplit[] splitArray = 
mapredInputFormat.getSplits(jobConf, minNumSplits);
+               HadoopInputSplit[] hiSplit = new 
HadoopInputSplit[splitArray.length];
+               for(int i=0;i<splitArray.length;i++){
+                       hiSplit[i] = new HadoopInputSplit(i, splitArray[i], 
jobConf);
+               }
+               return hiSplit;
+       }
+       
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
+               return new LocatableInputSplitAssigner(inputSplits);
+       }
+       
+       @Override
+       public void open(HadoopInputSplit split) throws IOException {
+               this.recordReader = 
this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, 
new HadoopDummyReporter());
+               if (this.recordReader instanceof Configurable) {
+                       ((Configurable) this.recordReader).setConf(jobConf);
+               }
+               key = this.recordReader.createKey();
+               value = this.recordReader.createValue();
+               this.fetched = false;
+       }
+       
+       @Override
+       public boolean reachedEnd() throws IOException {
+               if(!fetched) {
+                       fetchNext();
+               }
+               return !hasNext;
+       }
+       
+       protected void fetchNext() throws IOException {
+               hasNext = this.recordReader.next(key, value);
+               fetched = true;
+       }
+
+       @Override
+       public void close() throws IOException {
+               this.recordReader.close();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Helper methods
+       // 
--------------------------------------------------------------------------------------------
+       
+       private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, 
org.apache.hadoop.fs.Path[] hadoopFilePaths,
+                       ArrayList<FileStatus> files) throws IOException {
+               
+               long latestModTime = 0L;
+               
+               // get the file info and check whether the cached statistics 
are still valid.
+               for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+                       
+                       final Path filePath = new Path(hadoopPath.toUri());
+                       final FileSystem fs = FileSystem.get(filePath.toUri());
+                       
+                       final FileStatus file = fs.getFileStatus(filePath);
+                       latestModTime = Math.max(latestModTime, 
file.getModificationTime());
+                       
+                       // enumerate all files and check their modification 
time stamp.
+                       if (file.isDir()) {
+                               FileStatus[] fss = fs.listStatus(filePath);
+                               files.ensureCapacity(files.size() + fss.length);
+                               
+                               for (FileStatus s : fss) {
+                                       if (!s.isDir()) {
+                                               files.add(s);
+                                               latestModTime = 
Math.max(s.getModificationTime(), latestModTime);
+                                       }
+                               }
+                       } else {
+                               files.add(file);
+                       }
+               }
+               
+               // check whether the cached statistics are still valid, if we 
have any
+               if (cachedStats != null && latestModTime <= 
cachedStats.getLastModificationTime()) {
+                       return cachedStats;
+               }
+               
+               // calculate the whole length
+               long len = 0;
+               for (FileStatus s : files) {
+                       len += s.getLen();
+               }
+               
+               // sanity check
+               if (len <= 0) {
+                       len = BaseStatistics.SIZE_UNKNOWN;
+               }
+               
+               return new FileBaseStatistics(latestModTime, len, 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom serialization methods
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeUTF(mapredInputFormat.getClass().getName());
+               out.writeUTF(keyClass.getName());
+               out.writeUTF(valueClass.getName());
+               jobConf.write(out);
+       }
+       
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               String hadoopInputFormatClassName = in.readUTF();
+               String keyClassName = in.readUTF();
+               String valueClassName = in.readUTF();
+               if(jobConf == null) {
+                       jobConf = new JobConf();
+               }
+               jobConf.readFields(in);
+               try {
+                       this.mapredInputFormat = 
(org.apache.hadoop.mapred.InputFormat<K,V>) 
Class.forName(hadoopInputFormatClassName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to instantiate the 
hadoop input format", e);
+               }
+               try {
+                       this.keyClass = (Class<K>) Class.forName(keyClassName, 
true, Thread.currentThread().getContextClassLoader());
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to find key class.", 
e);
+               }
+               try {
+                       this.valueClass = (Class<V>) 
Class.forName(valueClassName, true, 
Thread.currentThread().getContextClassLoader());
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to find value 
class.", e);
+               }
+               ReflectionUtils.setConf(mapredInputFormat, jobConf);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
new file mode 100644
index 0000000..75623e2
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, 
Tuple2<K, V>> {
+
+       public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> 
mapredOutputFormat, JobConf job) {
+               super(mapredOutputFormat, job);
+       }
+
+       @Override
+       public void writeRecord(Tuple2<K, V> record) throws IOException {
+               this.recordWriter.write(record.f0, record.f1);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
new file mode 100644
index 0000000..a59b96f
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+
+public abstract class HadoopOutputFormatBase<K, V, T> implements 
OutputFormat<T>, FinalizeOnMaster {
+
+       private static final long serialVersionUID = 1L;
+
+       private JobConf jobConf;
+       private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
+       protected transient RecordWriter<K,V> recordWriter;
+       private transient FileOutputCommitter fileOutputCommitter;
+       private transient TaskAttemptContext context;
+       private transient JobContext jobContext;
+
+       public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, 
V> mapredOutputFormat, JobConf job) {
+               super();
+               this.mapredOutputFormat = mapredOutputFormat;
+               HadoopUtils.mergeHadoopConf(job);
+               this.jobConf = job;
+       }
+
+       public JobConf getJobConf() {
+               return jobConf;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  OutputFormat
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void configure(Configuration parameters) {
+               // nothing to do
+       }
+
+       /**
+        * create the temporary output file for hadoop RecordWriter.
+        * @param taskNumber The number of the parallel instance.
+        * @param numTasks The number of parallel tasks.
+        * @throws java.io.IOException
+        */
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {
+               if (Integer.toString(taskNumber + 1).length() > 6) {
+                       throw new IOException("Task id too large.");
+               }
+
+               TaskAttemptID taskAttemptID = 
TaskAttemptID.forName("attempt__0000_r_"
+                               + String.format("%" + (6 - 
Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+                               + Integer.toString(taskNumber + 1)
+                               + "_0");
+
+               this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+               this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+               // for hadoop 2.2
+               this.jobConf.set("mapreduce.task.attempt.id", 
taskAttemptID.toString());
+               this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+
+               try {
+                       this.context = 
HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               this.fileOutputCommitter = new FileOutputCommitter();
+
+               try {
+                       this.jobContext = 
HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               this.fileOutputCommitter.setupJob(jobContext);
+
+               this.recordWriter = 
this.mapredOutputFormat.getRecordWriter(null, this.jobConf, 
Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
+       }
+
+       /**
+        * commit the task by moving the output file out from the temporary 
directory.
+        * @throws java.io.IOException
+        */
+       @Override
+       public void close() throws IOException {
+               this.recordWriter.close(new HadoopDummyReporter());
+               
+               if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+                       this.fileOutputCommitter.commitTask(this.context);
+               }
+       }
+       
+       @Override
+       public void finalizeGlobal(int parallelism) throws IOException {
+
+               try {
+                       JobContext jobContext = 
HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+                       FileOutputCommitter fileOutputCommitter = new 
FileOutputCommitter();
+                       
+                       // finalize HDFS output format
+                       fileOutputCommitter.commitJob(jobContext);
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom serialization methods
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeUTF(mapredOutputFormat.getClass().getName());
+               jobConf.write(out);
+       }
+       
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               String hadoopOutputFormatName = in.readUTF();
+               if(jobConf == null) {
+                       jobConf = new JobConf();
+               }
+               jobConf.readFields(in);
+               try {
+                       this.mapredOutputFormat = 
(org.apache.hadoop.mapred.OutputFormat<K,V>) 
Class.forName(hadoopOutputFormatName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to instantiate the 
hadoop output format", e);
+               }
+               ReflectionUtils.setConf(mapredOutputFormat, jobConf);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
new file mode 100644
index 0000000..d4dc297
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.utils;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopUtils.class);
+
+       /**
+        * Merge HadoopConfiguration into JobConf. This is necessary for the 
HDFS configuration.
+        */
+       public static void mergeHadoopConf(JobConf jobConf) {
+               org.apache.hadoop.conf.Configuration hadoopConf = 
getHadoopConfiguration();
+               for (Map.Entry<String, String> e : hadoopConf) {
+                       jobConf.set(e.getKey(), e.getValue());
+               }
+       }
+       
+       public static JobContext instantiateJobContext(JobConf jobConf, JobID 
jobId) throws Exception {
+               try {
+                       // for Hadoop 1.xx
+                       Class<?> clazz = null;
+                       if(!TaskAttemptContext.class.isInterface()) { 
+                               clazz = 
Class.forName("org.apache.hadoop.mapred.JobContext", true, 
Thread.currentThread().getContextClassLoader());
+                       }
+                       // for Hadoop 2.xx
+                       else {
+                               clazz = 
Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, 
Thread.currentThread().getContextClassLoader());
+                       }
+                       Constructor<?> constructor = 
clazz.getDeclaredConstructor(JobConf.class, 
org.apache.hadoop.mapreduce.JobID.class);
+                       // for Hadoop 1.xx
+                       constructor.setAccessible(true);
+                       JobContext context = (JobContext) 
constructor.newInstance(jobConf, jobId);
+                       
+                       return context;
+               } catch(Exception e) {
+                       throw new Exception("Could not create instance of 
JobContext.", e);
+               }
+       }
+       
+       public static TaskAttemptContext instantiateTaskAttemptContext(JobConf 
jobConf,  TaskAttemptID taskAttemptID) throws Exception {
+               try {
+                       // for Hadoop 1.xx
+                       Class<?> clazz = null;
+                       if(!TaskAttemptContext.class.isInterface()) { 
+                               clazz = 
Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, 
Thread.currentThread().getContextClassLoader());
+                       }
+                       // for Hadoop 2.xx
+                       else {
+                               clazz = 
Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, 
Thread.currentThread().getContextClassLoader());
+                       }
+                       Constructor<?> constructor = 
clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
+                       // for Hadoop 1.xx
+                       constructor.setAccessible(true);
+                       TaskAttemptContext context = (TaskAttemptContext) 
constructor.newInstance(jobConf, taskAttemptID);
+                       return context;
+               } catch(Exception e) {
+                       throw new Exception("Could not create instance of 
TaskAttemptContext.", e);
+               }
+       }
+
+       /**
+        * Returns a new Hadoop Configuration object using the path to the 
hadoop conf configured
+        * in the main configuration (flink-conf.yaml).
+        * This method is public because its being used in the HadoopDataSource.
+        */
+       public static org.apache.hadoop.conf.Configuration 
getHadoopConfiguration() {
+               Configuration retConf = new 
org.apache.hadoop.conf.Configuration();
+
+               // We need to load both core-site.xml and hdfs-site.xml to 
determine the default fs path and
+               // the hdfs configuration
+               // Try to load HDFS configuration from Hadoop's own 
configuration files
+               // 1. approach: Flink configuration
+               final String hdfsDefaultPath = 
GlobalConfiguration.getString(ConfigConstants
+                               .HDFS_DEFAULT_CONFIG, null);
+               if (hdfsDefaultPath != null) {
+                       retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsDefaultPath));
+               } else {
+                       LOG.debug("Cannot find hdfs-default configuration 
file");
+               }
+
+               final String hdfsSitePath = 
GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+               if (hdfsSitePath != null) {
+                       retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsSitePath));
+               } else {
+                       LOG.debug("Cannot find hdfs-site configuration file");
+               }
+
+               // 2. Approach environment variables
+               String[] possibleHadoopConfPaths = new String[4];
+               possibleHadoopConfPaths[0] = 
GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+               possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+
+               if (System.getenv("HADOOP_HOME") != null) {
+                       possibleHadoopConfPaths[2] = 
System.getenv("HADOOP_HOME")+"/conf";
+                       possibleHadoopConfPaths[3] = 
System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+               }
+
+               for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+                       if (possibleHadoopConfPath != null) {
+                               if (new File(possibleHadoopConfPath).exists()) {
+                                       if (new File(possibleHadoopConfPath + 
"/core-site.xml").exists()) {
+                                               retConf.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Adding " + 
possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
+                                               }
+                                       }
+                                       if (new File(possibleHadoopConfPath + 
"/hdfs-site.xml").exists()) {
+                                               retConf.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Adding " + 
possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
+                                               }
+                                       }
+                               }
+                       }
+               }
+               return retConf;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
new file mode 100644
index 0000000..215b890
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.wrapper;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This is a dummy progress
+ *
+ */
+public class HadoopDummyProgressable implements Progressable {
+       @Override
+       public void progress() { 
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
new file mode 100644
index 0000000..01104ac
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.wrapper;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This is a dummy progress monitor / reporter
+ *
+ */
+public class HadoopDummyReporter implements Reporter {
+
+       @Override
+       public void progress() {
+       }
+
+       @Override
+       public void setStatus(String status) {
+
+       }
+
+       @Override
+       public Counter getCounter(Enum<?> name) {
+               return null;
+       }
+
+       @Override
+       public Counter getCounter(String group, String name) {
+               return null;
+       }
+
+       @Override
+       public void incrCounter(Enum<?> key, long amount) {
+
+       }
+
+       @Override
+       public void incrCounter(String group, String counter, long amount) {
+
+       }
+
+       @Override
+       public InputSplit getInputSplit() throws UnsupportedOperationException {
+               return null;
+       }
+       // There should be an @Override, but some CDH4 dependency does not 
contain this method
+       public float getProgress() {
+               return 0;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..beef5d7
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.wrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapred.JobConf;
+
+
+public class HadoopInputSplit extends LocatableInputSplit {
+
+       private static final long serialVersionUID = 1L;
+
+       
+       private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
+       
+       private JobConf jobConf;
+       
+       private int splitNumber;
+       private String hadoopInputSplitTypeName;
+
+
+       public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
+               return hadoopInputSplit;
+       }
+
+       public HadoopInputSplit() {
+               super();
+       }
+
+       public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
+
+               this.splitNumber = splitNumber;
+               this.hadoopInputSplit = hInputSplit;
+               this.hadoopInputSplitTypeName = 
hInputSplit.getClass().getName();
+               this.jobConf = jobconf;
+
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(splitNumber);
+               out.writeUTF(hadoopInputSplitTypeName);
+               jobConf.write(out);
+               hadoopInputSplit.write(out);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.splitNumber = in.readInt();
+               this.hadoopInputSplitTypeName = in.readUTF();
+               if(hadoopInputSplit == null) {
+                       try {
+                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit =
+                                               
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+                               this.hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit 
);
+                       }
+                       catch (Exception e) {
+                               throw new RuntimeException("Unable to create 
InputSplit", e);
+                       }
+               }
+               jobConf = new JobConf();
+               jobConf.readFields(in);
+               if (this.hadoopInputSplit instanceof Configurable) {
+                       ((Configurable) 
this.hadoopInputSplit).setConf(this.jobConf);
+               }
+               this.hadoopInputSplit.readFields(in);
+
+       }
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeInt(splitNumber);
+               out.writeUTF(hadoopInputSplitTypeName);
+               jobConf.write(out);
+               hadoopInputSplit.write(out);
+
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               this.splitNumber=in.readInt();
+               this.hadoopInputSplitTypeName = in.readUTF();
+               if(hadoopInputSplit == null) {
+                       try {
+                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit =
+                                               
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+                               this.hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit 
);
+                       }
+                       catch (Exception e) {
+                               throw new RuntimeException("Unable to create 
InputSplit", e);
+                       }
+               }
+               jobConf = new JobConf();
+               jobConf.readFields(in);
+               if (this.hadoopInputSplit instanceof Configurable) {
+                       ((Configurable) 
this.hadoopInputSplit).setConf(this.jobConf);
+               }
+               this.hadoopInputSplit.readFields(in);
+       }
+
+       @Override
+       public int getSplitNumber() {
+               return this.splitNumber;
+       }
+
+       @Override
+       public String[] getHostnames() {
+               try {
+                       return this.hadoopInputSplit.getLocations();
+               } catch(IOException ioe) {
+                       return new String[0];
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
new file mode 100644
index 0000000..efe97f1
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, 
Tuple2<K, V>> implements ResultTypeQueryable<Tuple2<K,V>> {
+
+       public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+               super(mapreduceInputFormat, key, value, job);
+       }
+
+       @Override
+       public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+               if(!this.fetched) {
+                       fetchNext();
+               }
+               if(!this.hasNext) {
+                       return null;
+               }
+               try {
+                       record.f0 = recordReader.getCurrentKey();
+                       record.f1 = recordReader.getCurrentValue();
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not get KeyValue pair.", 
e);
+               }
+               this.fetched = false;
+
+               return record;
+       }
+
+       @Override
+       public TypeInformation<Tuple2<K,V>> getProducedType() {
+               return new 
TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
new file mode 100644
index 0000000..2a6c0f4
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, 
HadoopInputSplit> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopInputFormatBase.class);
+
+       private org.apache.hadoop.mapreduce.InputFormat<K, V> 
mapreduceInputFormat;
+       protected Class<K> keyClass;
+       protected Class<V> valueClass;
+       private org.apache.hadoop.conf.Configuration configuration;
+
+       protected transient RecordReader<K, V> recordReader;
+       protected boolean fetched = false;
+       protected boolean hasNext;
+
+       public HadoopInputFormatBase(org.apache.hadoop.mapreduce.InputFormat<K, 
V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+               super();
+               this.mapreduceInputFormat = mapreduceInputFormat;
+               this.keyClass = key;
+               this.valueClass = value;
+               this.configuration = job.getConfiguration();
+               HadoopUtils.mergeHadoopConf(configuration);
+       }
+
+       public org.apache.hadoop.conf.Configuration getConfiguration() {
+               return this.configuration;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  InputFormat
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void configure(Configuration parameters) {
+               // nothing to do
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
+               // only gather base statistics for FileInputFormats
+               if(!(mapreduceInputFormat instanceof FileInputFormat)) {
+                       return null;
+               }
+
+               JobContext jobContext = null;
+               try {
+                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, null);
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               final FileBaseStatistics cachedFileStats = (cachedStats != null 
&& cachedStats instanceof FileBaseStatistics) ?
+                               (FileBaseStatistics) cachedStats : null;
+                               
+                               try {
+                                       final org.apache.hadoop.fs.Path[] paths 
= FileInputFormat.getInputPaths(jobContext);
+                                       return getFileStats(cachedFileStats, 
paths, new ArrayList<FileStatus>(1));
+                               } catch (IOException ioex) {
+                                       if (LOG.isWarnEnabled()) {
+                                               LOG.warn("Could not determine 
statistics due to an io error: " 
+                                                               + 
ioex.getMessage());
+                                       }
+                               } catch (Throwable t) {
+                                       if (LOG.isErrorEnabled()) {
+                                               LOG.error("Unexpected problem 
while getting the file statistics: "
+                                                               + 
t.getMessage(), t);
+                                       }
+                               }
+                               
+                               // no statistics available
+                               return null;
+       }
+
+       @Override
+       public HadoopInputSplit[] createInputSplits(int minNumSplits)
+                       throws IOException {
+               
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", 
minNumSplits);
+
+               JobContext jobContext = null;
+               try {
+                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, new JobID());
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               List<org.apache.hadoop.mapreduce.InputSplit> splits;
+               try {
+                       splits = 
this.mapreduceInputFormat.getSplits(jobContext);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not get Splits.", e);
+               }
+               HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
+
+               for(int i = 0; i < hadoopInputSplits.length; i++){
+                       hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
+               }
+               return hadoopInputSplits;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
+               return new LocatableInputSplitAssigner(inputSplits);
+       }
+
+       @Override
+       public void open(HadoopInputSplit split) throws IOException {
+               TaskAttemptContext context = null;
+               try {
+                       context = 
HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+               } catch(Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               try {
+                       this.recordReader = this.mapreduceInputFormat
+                                       
.createRecordReader(split.getHadoopInputSplit(), context);
+                       
this.recordReader.initialize(split.getHadoopInputSplit(), context);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not create RecordReader.", 
e);
+               } finally {
+                       this.fetched = false;
+               }
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               if(!this.fetched) {
+                       fetchNext();
+               }
+               return !this.hasNext;
+       }
+
+       protected void fetchNext() throws IOException {
+               try {
+                       this.hasNext = this.recordReader.nextKeyValue();
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not fetch next KeyValue 
pair.", e);
+               } finally {
+                       this.fetched = true;
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               this.recordReader.close();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Helper methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, 
org.apache.hadoop.fs.Path[] hadoopFilePaths, 
+                                                                               
        ArrayList<FileStatus> files) throws IOException {
+
+               long latestModTime = 0L;
+
+               // get the file info and check whether the cached statistics 
are still valid.
+               for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+
+                       final Path filePath = new Path(hadoopPath.toUri());
+                       final FileSystem fs = FileSystem.get(filePath.toUri());
+
+                       final FileStatus file = fs.getFileStatus(filePath);
+                       latestModTime = Math.max(latestModTime, 
file.getModificationTime());
+
+                       // enumerate all files and check their modification 
time stamp.
+                       if (file.isDir()) {
+                               FileStatus[] fss = fs.listStatus(filePath);
+                               files.ensureCapacity(files.size() + fss.length);
+
+                               for (FileStatus s : fss) {
+                                       if (!s.isDir()) {
+                                               files.add(s);
+                                               latestModTime = 
Math.max(s.getModificationTime(), latestModTime);
+                                       }
+                               }
+                       } else {
+                               files.add(file);
+                       }
+               }
+
+               // check whether the cached statistics are still valid, if we 
have any
+               if (cachedStats != null && latestModTime <= 
cachedStats.getLastModificationTime()) {
+                       return cachedStats;
+               }
+
+               // calculate the whole length
+               long len = 0;
+               for (FileStatus s : files) {
+                       len += s.getLen();
+               }
+
+               // sanity check
+               if (len <= 0) {
+                       len = BaseStatistics.SIZE_UNKNOWN;
+               }
+
+               return new FileBaseStatistics(latestModTime, len, 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom serialization methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeUTF(this.mapreduceInputFormat.getClass().getName());
+               out.writeUTF(this.keyClass.getName());
+               out.writeUTF(this.valueClass.getName());
+               this.configuration.write(out);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               String hadoopInputFormatClassName = in.readUTF();
+               String keyClassName = in.readUTF();
+               String valueClassName = in.readUTF();
+
+               org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+               configuration.readFields(in);
+
+               if(this.configuration == null) {
+                       this.configuration = configuration;
+               }
+
+               try {
+                       this.mapreduceInputFormat = 
(org.apache.hadoop.mapreduce.InputFormat<K,V>) 
Class.forName(hadoopInputFormatClassName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to instantiate the 
hadoop input format", e);
+               }
+               try {
+                       this.keyClass = (Class<K>) Class.forName(keyClassName, 
true, Thread.currentThread().getContextClassLoader());
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to find key class.", 
e);
+               }
+               try {
+                       this.valueClass = (Class<V>) 
Class.forName(valueClassName, true, 
Thread.currentThread().getContextClassLoader());
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to find value 
class.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
new file mode 100644
index 0000000..7d3675c
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.mapreduce.Job;
+
+public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, 
Tuple2<K, V>> {
+       
+       public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> 
mapreduceOutputFormat, Job job) {
+               super(mapreduceOutputFormat, job);
+       }
+       
+       @Override
+       public void writeRecord(Tuple2<K, V> record) throws IOException {
+               try {
+                       this.recordWriter.write(record.f0, record.f1);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not write Record.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
new file mode 100644
index 0000000..a7ae428
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+
+public abstract class HadoopOutputFormatBase<K, V, T> implements 
OutputFormat<T>, FinalizeOnMaster {
+
+       private static final long serialVersionUID = 1L;
+
+       private org.apache.hadoop.conf.Configuration configuration;
+       private org.apache.hadoop.mapreduce.OutputFormat<K,V> 
mapreduceOutputFormat;
+       protected transient RecordWriter<K,V> recordWriter;
+       private transient FileOutputCommitter fileOutputCommitter;
+       private transient TaskAttemptContext context;
+       private transient int taskNumber;
+
+       public 
HadoopOutputFormatBase(org.apache.hadoop.mapreduce.OutputFormat<K, V> 
mapreduceOutputFormat, Job job) {
+               super();
+               this.mapreduceOutputFormat = mapreduceOutputFormat;
+               this.configuration = job.getConfiguration();
+               HadoopUtils.mergeHadoopConf(configuration);
+       }
+
+       public org.apache.hadoop.conf.Configuration getConfiguration() {
+               return this.configuration;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  OutputFormat
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void configure(Configuration parameters) {
+               // nothing to do
+       }
+
+       /**
+        * create the temporary output file for hadoop RecordWriter.
+        * @param taskNumber The number of the parallel instance.
+        * @param numTasks The number of parallel tasks.
+        * @throws java.io.IOException
+        */
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {
+               if (Integer.toString(taskNumber + 1).length() > 6) {
+                       throw new IOException("Task id too large.");
+               }
+
+               this.taskNumber = taskNumber+1;
+
+               // for hadoop 2.2
+               this.configuration.set("mapreduce.output.basename", "tmp");
+
+               TaskAttemptID taskAttemptID = 
TaskAttemptID.forName("attempt__0000_r_"
+                               + String.format("%" + (6 - 
Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+                               + Integer.toString(taskNumber + 1)
+                               + "_0");
+
+               this.configuration.set("mapred.task.id", 
taskAttemptID.toString());
+               this.configuration.setInt("mapred.task.partition", taskNumber + 
1);
+               // for hadoop 2.2
+               this.configuration.set("mapreduce.task.attempt.id", 
taskAttemptID.toString());
+               this.configuration.setInt("mapreduce.task.partition", 
taskNumber + 1);
+
+               try {
+                       this.context = 
HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               this.fileOutputCommitter = new FileOutputCommitter(new 
Path(this.configuration.get("mapred.output.dir")), context);
+
+               try {
+                       
this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration,
 new JobID()));
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               // compatible for hadoop 2.2.0, the temporary output directory 
is different from hadoop 1.2.1
+               this.configuration.set("mapreduce.task.output.dir", 
this.fileOutputCommitter.getWorkPath().toString());
+
+               try {
+                       this.recordWriter = 
this.mapreduceOutputFormat.getRecordWriter(this.context);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not create RecordWriter.", 
e);
+               }
+       }
+
+       /**
+        * commit the task by moving the output file out from the temporary 
directory.
+        * @throws java.io.IOException
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       this.recordWriter.close(this.context);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not close RecordReader.", 
e);
+               }
+               
+               if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+                       this.fileOutputCommitter.commitTask(this.context);
+               }
+               
+               Path outputPath = new 
Path(this.configuration.get("mapred.output.dir"));
+               
+               // rename tmp-file to final name
+               FileSystem fs = FileSystem.get(outputPath.toUri(), 
this.configuration);
+               
+               String taskNumberStr = Integer.toString(this.taskNumber);
+               String tmpFileTemplate = "tmp-r-00000";
+               String tmpFile = 
tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
+               
+               if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+                       fs.rename(new Path(outputPath.toString()+"/"+tmpFile), 
new Path(outputPath.toString()+"/"+taskNumberStr));
+               }
+       }
+       
+       @Override
+       public void finalizeGlobal(int parallelism) throws IOException {
+
+               JobContext jobContext;
+               TaskAttemptContext taskContext;
+               try {
+                       
+                       TaskAttemptID taskAttemptID = 
TaskAttemptID.forName("attempt__0000_r_" 
+                                       + String.format("%" + (6 - 
Integer.toString(1).length()) + "s"," ").replace(" ", "0") 
+                                       + Integer.toString(1) 
+                                       + "_0");
+                       
+                       jobContext = 
HadoopUtils.instantiateJobContext(this.configuration, new JobID());
+                       taskContext = 
HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+               this.fileOutputCommitter = new FileOutputCommitter(new 
Path(this.configuration.get("mapred.output.dir")), taskContext);
+               
+               // finalize HDFS output format
+               this.fileOutputCommitter.commitJob(jobContext);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom serialization methods
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
+               this.configuration.write(out);
+       }
+       
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               String hadoopOutputFormatClassName = in.readUTF();
+               
+               org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+               configuration.readFields(in);
+               
+               if(this.configuration == null) {
+                       this.configuration = configuration;
+               }
+               
+               try {
+                       this.mapreduceOutputFormat = 
(org.apache.hadoop.mapreduce.OutputFormat<K,V>) 
Class.forName(hadoopOutputFormatClassName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to instantiate the 
hadoop output format", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
new file mode 100644
index 0000000..fe8f8cc
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.hadoop.mapreduce.utils;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class HadoopUtils {
+       
+       /**
+        * Merge HadoopConfiguration into Configuration. This is necessary for 
the HDFS configuration.
+        */
+       public static void mergeHadoopConf(Configuration configuration) {
+               Configuration hadoopConf = 
org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
+               
+               for (Map.Entry<String, String> e : hadoopConf) {
+                       configuration.set(e.getKey(), e.getValue());
+               }
+       }
+       
+       public static JobContext instantiateJobContext(Configuration 
configuration, JobID jobId) throws Exception {
+               try {
+                       Class<?> clazz;
+                       // for Hadoop 1.xx
+                       if(JobContext.class.isInterface()) {
+                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, 
Thread.currentThread().getContextClassLoader());
+                       }
+                       // for Hadoop 2.xx
+                       else {
+                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.JobContext", true, 
Thread.currentThread().getContextClassLoader());
+                       }
+                       Constructor<?> constructor = 
clazz.getConstructor(Configuration.class, JobID.class);
+                       JobContext context = (JobContext) 
constructor.newInstance(configuration, jobId);
+                       
+                       return context;
+               } catch(Exception e) {
+                       throw new Exception("Could not create instance of 
JobContext.");
+               }
+       }
+       
+       public static TaskAttemptContext 
instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID 
taskAttemptID) throws Exception {
+               try {
+                       Class<?> clazz;
+                       // for Hadoop 1.xx
+                       if(JobContext.class.isInterface()) {
+                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+                       }
+                       // for Hadoop 2.xx
+                       else {
+                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
+                       }
+                       Constructor<?> constructor = 
clazz.getConstructor(Configuration.class, TaskAttemptID.class);
+                       TaskAttemptContext context = (TaskAttemptContext) 
constructor.newInstance(configuration, taskAttemptID);
+                       
+                       return context;
+               } catch(Exception e) {
+                       throw new Exception("Could not create instance of 
TaskAttemptContext.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..f2758b3
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce.wrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapreduce.JobContext;
+
+
+public class HadoopInputSplit extends LocatableInputSplit {
+       
+       public transient org.apache.hadoop.mapreduce.InputSplit 
mapreduceInputSplit;
+       public transient JobContext jobContext;
+       
+       private int splitNumber;
+       
+       public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
+               return mapreduceInputSplit;
+       }
+       
+       
+       public HadoopInputSplit() {
+               super();
+       }
+       
+       
+       public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext 
jobContext) {
+               this.splitNumber = splitNumber;
+               if(!(mapreduceInputSplit instanceof Writable)) {
+                       throw new IllegalArgumentException("InputSplit must 
implement Writable interface.");
+               }
+               this.mapreduceInputSplit = mapreduceInputSplit;
+               this.jobContext = jobContext;
+       }
+       
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(this.splitNumber);
+               out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+               Writable w = (Writable) this.mapreduceInputSplit;
+               w.write(out);
+       }
+       
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.splitNumber = in.readInt();
+               String className = in.readUTF();
+               
+               if(this.mapreduceInputSplit == null) {
+                       try {
+                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit = 
+                                               
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+                               this.mapreduceInputSplit = 
(org.apache.hadoop.mapreduce.InputSplit) 
WritableFactories.newInstance(inputSplit);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Unable to create 
InputSplit", e);
+                       }
+               }
+               ((Writable)this.mapreduceInputSplit).readFields(in);
+       }
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeInt(this.splitNumber);
+               out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+               Writable w = (Writable) this.mapreduceInputSplit;
+               w.write(out);
+
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               this.splitNumber=in.readInt();
+               String className = in.readUTF();
+
+               if(this.mapreduceInputSplit == null) {
+                       try {
+                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit =
+                                               
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+                               this.mapreduceInputSplit = 
(org.apache.hadoop.mapreduce.InputSplit) 
WritableFactories.newInstance(inputSplit);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Unable to create 
InputSplit", e);
+                       }
+               }
+               ((Writable)this.mapreduceInputSplit).readFields(in);
+       }
+       
+       @Override
+       public int getSplitNumber() {
+               return this.splitNumber;
+       }
+       
+       @Override
+       public String[] getHostnames() {
+               try {
+                       return this.mapreduceInputSplit.getLocations();
+               } catch (IOException e) {
+                       return new String[0];
+               } catch (InterruptedException e) {
+                       return new String[0];
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
new file mode 100644
index 0000000..89aa67e
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+public class HadoopInputFormatTest {
+
+
+       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
+
+               public DummyVoidKeyInputFormat() {
+               }
+
+               @Override
+               public org.apache.hadoop.mapred.RecordReader<Void, T> 
getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf 
jobConf, Reporter reporter) throws IOException {
+                       return null;
+               }
+       }
+       
+       
+       @Test
+       public void checkTypeInformation() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       // Set up the Hadoop Input Format
+                       Job job = Job.getInstance();
+                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, new JobConf());
+
+                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
+                       
+                       if(tupleType.isTupleType()) {
+                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+                                       fail("Tuple type information was not 
set correctly!");
+                               }
+                       } else {
+                               fail("Type information was not set to tuple 
type information!");
+                       }
+
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+
+       }
+       
+}

Reply via email to