http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..f2758b3
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce.wrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapreduce.JobContext;
+
+
+public class HadoopInputSplit extends LocatableInputSplit {
+       
+       public transient org.apache.hadoop.mapreduce.InputSplit 
mapreduceInputSplit;
+       public transient JobContext jobContext;
+       
+       private int splitNumber;
+       
+       public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
+               return mapreduceInputSplit;
+       }
+       
+       
+       public HadoopInputSplit() {
+               super();
+       }
+       
+       
+       public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext 
jobContext) {
+               this.splitNumber = splitNumber;
+               if(!(mapreduceInputSplit instanceof Writable)) {
+                       throw new IllegalArgumentException("InputSplit must 
implement Writable interface.");
+               }
+               this.mapreduceInputSplit = mapreduceInputSplit;
+               this.jobContext = jobContext;
+       }
+       
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(this.splitNumber);
+               out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+               Writable w = (Writable) this.mapreduceInputSplit;
+               w.write(out);
+       }
+       
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.splitNumber = in.readInt();
+               String className = in.readUTF();
+               
+               if(this.mapreduceInputSplit == null) {
+                       try {
+                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit = 
+                                               
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+                               this.mapreduceInputSplit = 
(org.apache.hadoop.mapreduce.InputSplit) 
WritableFactories.newInstance(inputSplit);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Unable to create 
InputSplit", e);
+                       }
+               }
+               ((Writable)this.mapreduceInputSplit).readFields(in);
+       }
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeInt(this.splitNumber);
+               out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+               Writable w = (Writable) this.mapreduceInputSplit;
+               w.write(out);
+
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               this.splitNumber=in.readInt();
+               String className = in.readUTF();
+
+               if(this.mapreduceInputSplit == null) {
+                       try {
+                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit =
+                                               
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+                               this.mapreduceInputSplit = 
(org.apache.hadoop.mapreduce.InputSplit) 
WritableFactories.newInstance(inputSplit);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Unable to create 
InputSplit", e);
+                       }
+               }
+               ((Writable)this.mapreduceInputSplit).readFields(in);
+       }
+       
+       @Override
+       public int getSplitNumber() {
+               return this.splitNumber;
+       }
+       
+       @Override
+       public String[] getHostnames() {
+               try {
+                       return this.mapreduceInputSplit.getLocations();
+               } catch (IOException e) {
+                       return new String[0];
+               } catch (InterruptedException e) {
+                       return new String[0];
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
new file mode 100644
index 0000000..89aa67e
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+public class HadoopInputFormatTest {
+
+
+       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
+
+               public DummyVoidKeyInputFormat() {
+               }
+
+               @Override
+               public org.apache.hadoop.mapred.RecordReader<Void, T> 
getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf 
jobConf, Reporter reporter) throws IOException {
+                       return null;
+               }
+       }
+       
+       
+       @Test
+       public void checkTypeInformation() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       // Set up the Hadoop Input Format
+                       Job job = Job.getInstance();
+                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, new JobConf());
+
+                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
+                       
+                       if(tupleType.isTupleType()) {
+                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+                                       fail("Tuple type information was not 
set correctly!");
+                               }
+                       } else {
+                               fail("Type information was not set to tuple 
type information!");
+                       }
+
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
new file mode 100644
index 0000000..236d149
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+
+public class HadoopInputFormatTest {
+
+
+       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
+
+               public DummyVoidKeyInputFormat() {
+               }
+
+               @Override
+               public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
+                       return null;
+               }
+       }
+       
+       
+       @Test
+       public void checkTypeInformation() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       // Set up the Hadoop Input Format
+                       Job job = Job.getInstance();
+                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, job);
+
+                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
+                       
+                       if(tupleType.isTupleType()) {
+                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+                                       fail("Tuple type information was not 
set correctly!");
+                               }
+                       } else {
+                               fail("Type information was not set to tuple 
type information!");
+                       }
+
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 6305619..e193770 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -27,16 +27,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.flink.api.scala.hadoop.mapreduce
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
 import org.apache.flink.core.fs.Path
 
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv,
-CollectionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, 
CollectionEnvironment}
 import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
 
 import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job}
+import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat,
+InputFormat => MapredInputFormat, JobConf}
+import org.apache.hadoop.fs.{Path => HadoopPath}
+
 
 import scala.collection.JavaConverters._
 
@@ -304,6 +311,92 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.FileInputFormat]]. The
+   * given inputName is set on the given job.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: JobConf)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val result = createHadoopInput(mapredInputFormat, key, value, job)
+    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    result
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.FileInputFormat]]. A
+   * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.InputFormat]].
+   */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapredInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: JobConf)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val hadoopInputFormat = new mapred.HadoopInputFormat[K, 
V](mapredInputFormat, key, value, job)
+    createInput(hadoopInputFormat)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+   * The given inputName is set on the given job.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: Job)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val result = createHadoopInput(mapredInputFormat, key, value, job)
+    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    result
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given
+   * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
+   * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be 
created.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
+    readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapreduce.InputFormat]].
+   */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapreduceInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: Job)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
+    val hadoopInputFormat =
+      new mapreduce.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+    createInput(hadoopInputFormat)
+  }
+
+  /**
    * Creates a DataSet from the given non-empty [[Seq]]. The elements need to 
be serializable
    * because the framework may move the elements into the cluster if needed.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
new file mode 100644
index 0000000..d03e433
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase
+import org.apache.hadoop.mapred.{JobConf, InputFormat}
+
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: JobConf)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, 
valueClass, job) {
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      return null
+    }
+    fetched = false
+    (key, value)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
new file mode 100644
index 0000000..56b7a7f
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: 
JobConf)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
new file mode 100644
index 0000000..9c94f29
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase
+import org.apache.hadoop.mapreduce.{InputFormat, Job}
+
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: Job)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, 
valueClass, job) {
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      return null
+    }
+    fetched = false
+    (recordReader.getCurrentKey, recordReader.getCurrentValue)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
new file mode 100644
index 0000000..b8ba3c1
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase
+import org.apache.hadoop.mapreduce.{Job, OutputFormat}
+
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: 
Job)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
deleted file mode 100644
index 326a1c4..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, 
HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopInputFormat.class);
-       
-       private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
-       private Class<K> keyClass;
-       private Class<V> valueClass;
-       private JobConf jobConf;
-       
-       private transient K key;
-       private transient V value;
-       
-       private transient RecordReader<K, V> recordReader;
-       private transient boolean fetched = false;
-       private transient boolean hasNext;
-
-       public HadoopInputFormat() {
-               super();
-       }
-       
-       public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> 
mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-               super();
-               this.mapredInputFormat = mapredInputFormat;
-               this.keyClass = key;
-               this.valueClass = value;
-               HadoopUtils.mergeHadoopConf(job);
-               this.jobConf = job;
-               ReflectionUtils.setConf(mapredInputFormat, jobConf);
-       }
-       
-       public void setJobConf(JobConf job) {
-               this.jobConf = job;
-       }
-       
-       public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() 
{
-               return mapredInputFormat;
-       }
-       
-       public void 
setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> 
mapredInputFormat) {
-               this.mapredInputFormat = mapredInputFormat;
-       }
-       
-       public JobConf getJobConf() {
-               return jobConf;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  InputFormat
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void configure(Configuration parameters) {
-               // nothing to do
-       }
-       
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
-               // only gather base statistics for FileInputFormats
-               if(!(mapredInputFormat instanceof FileInputFormat)) {
-                       return null;
-               }
-               
-               final FileBaseStatistics cachedFileStats = (cachedStats != null 
&& cachedStats instanceof FileBaseStatistics) ?
-                               (FileBaseStatistics) cachedStats : null;
-               
-               try {
-                       final org.apache.hadoop.fs.Path[] paths = 
FileInputFormat.getInputPaths(this.jobConf);
-                       
-                       return getFileStats(cachedFileStats, paths, new 
ArrayList<FileStatus>(1));
-               } catch (IOException ioex) {
-                       if (LOG.isWarnEnabled()) {
-                               LOG.warn("Could not determine statistics due to 
an io error: "
-                                               + ioex.getMessage());
-                       }
-               } catch (Throwable t) {
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Unexpected problem while getting the 
file statistics: "
-                                               + t.getMessage(), t);
-                       }
-               }
-               
-               // no statistics available
-               return null;
-       }
-       
-       @Override
-       public HadoopInputSplit[] createInputSplits(int minNumSplits)
-                       throws IOException {
-               org.apache.hadoop.mapred.InputSplit[] splitArray = 
mapredInputFormat.getSplits(jobConf, minNumSplits);
-               HadoopInputSplit[] hiSplit = new 
HadoopInputSplit[splitArray.length];
-               for(int i=0;i<splitArray.length;i++){
-                       hiSplit[i] = new HadoopInputSplit(i, splitArray[i], 
jobConf);
-               }
-               return hiSplit;
-       }
-       
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
-               return new LocatableInputSplitAssigner(inputSplits);
-       }
-       
-       @Override
-       public void open(HadoopInputSplit split) throws IOException {
-               this.recordReader = 
this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, 
new HadoopDummyReporter());
-               if (this.recordReader instanceof Configurable) {
-                       ((Configurable) this.recordReader).setConf(jobConf);
-               }
-               key = this.recordReader.createKey();
-               value = this.recordReader.createValue();
-               this.fetched = false;
-       }
-       
-       @Override
-       public boolean reachedEnd() throws IOException {
-               if(!fetched) {
-                       fetchNext();
-               }
-               return !hasNext;
-       }
-       
-       private void fetchNext() throws IOException {
-               hasNext = this.recordReader.next(key, value);
-               fetched = true;
-       }
-       
-       @Override
-       public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-               if(!fetched) {
-                       fetchNext();
-               }
-               if(!hasNext) {
-                       return null;
-               }
-               record.f0 = key;
-               record.f1 = value;
-               fetched = false;
-               return record;
-       }
-       
-       @Override
-       public void close() throws IOException {
-               this.recordReader.close();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Helper methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, 
org.apache.hadoop.fs.Path[] hadoopFilePaths,
-                       ArrayList<FileStatus> files) throws IOException {
-               
-               long latestModTime = 0L;
-               
-               // get the file info and check whether the cached statistics 
are still valid.
-               for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-                       
-                       final Path filePath = new Path(hadoopPath.toUri());
-                       final FileSystem fs = FileSystem.get(filePath.toUri());
-                       
-                       final FileStatus file = fs.getFileStatus(filePath);
-                       latestModTime = Math.max(latestModTime, 
file.getModificationTime());
-                       
-                       // enumerate all files and check their modification 
time stamp.
-                       if (file.isDir()) {
-                               FileStatus[] fss = fs.listStatus(filePath);
-                               files.ensureCapacity(files.size() + fss.length);
-                               
-                               for (FileStatus s : fss) {
-                                       if (!s.isDir()) {
-                                               files.add(s);
-                                               latestModTime = 
Math.max(s.getModificationTime(), latestModTime);
-                                       }
-                               }
-                       } else {
-                               files.add(file);
-                       }
-               }
-               
-               // check whether the cached statistics are still valid, if we 
have any
-               if (cachedStats != null && latestModTime <= 
cachedStats.getLastModificationTime()) {
-                       return cachedStats;
-               }
-               
-               // calculate the whole length
-               long len = 0;
-               for (FileStatus s : files) {
-                       len += s.getLen();
-               }
-               
-               // sanity check
-               if (len <= 0) {
-                       len = BaseStatistics.SIZE_UNKNOWN;
-               }
-               
-               return new FileBaseStatistics(latestModTime, len, 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom serialization methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeUTF(mapredInputFormat.getClass().getName());
-               out.writeUTF(keyClass.getName());
-               out.writeUTF(valueClass.getName());
-               jobConf.write(out);
-       }
-       
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               String hadoopInputFormatClassName = in.readUTF();
-               String keyClassName = in.readUTF();
-               String valueClassName = in.readUTF();
-               if(jobConf == null) {
-                       jobConf = new JobConf();
-               }
-               jobConf.readFields(in);
-               try {
-                       this.mapredInputFormat = 
(org.apache.hadoop.mapred.InputFormat<K,V>) 
Class.forName(hadoopInputFormatClassName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to instantiate the 
hadoop input format", e);
-               }
-               try {
-                       this.keyClass = (Class<K>) Class.forName(keyClassName, 
true, Thread.currentThread().getContextClassLoader());
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to find key class.", 
e);
-               }
-               try {
-                       this.valueClass = (Class<V>) 
Class.forName(valueClassName, true, 
Thread.currentThread().getContextClassLoader());
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to find value 
class.", e);
-               }
-               ReflectionUtils.setConf(mapredInputFormat, jobConf);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  ResultTypeQueryable
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public TypeInformation<Tuple2<K,V>> getProducedType() {
-               return new 
TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
index dfe0067..4d81daf 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
deleted file mode 100644
index f3abfcd..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, 
FinalizeOnMaster {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private JobConf jobConf;
-       private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;  
-       private transient RecordWriter<K,V> recordWriter;       
-       private transient FileOutputCommitter fileOutputCommitter;
-       private transient TaskAttemptContext context;
-       private transient JobContext jobContext;
-       
-       public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> 
mapredOutputFormat, JobConf job) {
-               super();
-               this.mapredOutputFormat = mapredOutputFormat;
-               HadoopUtils.mergeHadoopConf(job);
-               this.jobConf = job;
-       }
-       
-       public void setJobConf(JobConf job) {
-               this.jobConf = job;
-       }
-       
-       public JobConf getJobConf() {
-               return jobConf;
-       }
-       
-       public org.apache.hadoop.mapred.OutputFormat<K,V> 
getHadoopOutputFormat() {
-               return mapredOutputFormat;
-       }
-       
-       public void 
setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> 
mapredOutputFormat) {
-               this.mapredOutputFormat = mapredOutputFormat;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  OutputFormat
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void configure(Configuration parameters) {
-               // nothing to do
-       }
-       
-       /**
-        * create the temporary output file for hadoop RecordWriter.
-        * @param taskNumber The number of the parallel instance.
-        * @param numTasks The number of parallel tasks.
-        * @throws IOException
-        */
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               if (Integer.toString(taskNumber + 1).length() > 6) {
-                       throw new IOException("Task id too large.");
-               }
-               
-               TaskAttemptID taskAttemptID = 
TaskAttemptID.forName("attempt__0000_r_" 
-                               + String.format("%" + (6 - 
Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
-                               + Integer.toString(taskNumber + 1) 
-                               + "_0");
-               
-               this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-               this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
-               // for hadoop 2.2
-               this.jobConf.set("mapreduce.task.attempt.id", 
taskAttemptID.toString());
-               this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
-               
-               try {
-                       this.context = 
HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-               
-               this.fileOutputCommitter = new FileOutputCommitter();
-               
-               try {
-                       this.jobContext = 
HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-               
-               this.fileOutputCommitter.setupJob(jobContext);
-               
-               this.recordWriter = 
this.mapredOutputFormat.getRecordWriter(null, this.jobConf, 
Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
-       }
-       
-       @Override
-       public void writeRecord(Tuple2<K, V> record) throws IOException {
-               this.recordWriter.write(record.f0, record.f1);
-       }
-       
-       /**
-        * commit the task by moving the output file out from the temporary 
directory.
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               this.recordWriter.close(new HadoopDummyReporter());
-               
-               if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-                       this.fileOutputCommitter.commitTask(this.context);
-               }
-       }
-       
-       @Override
-       public void finalizeGlobal(int parallelism) throws IOException {
-
-               try {
-                       JobContext jobContext = 
HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-                       FileOutputCommitter fileOutputCommitter = new 
FileOutputCommitter();
-                       
-                       // finalize HDFS output format
-                       fileOutputCommitter.commitJob(jobContext);
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom serialization methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeUTF(mapredOutputFormat.getClass().getName());
-               jobConf.write(out);
-       }
-       
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               String hadoopOutputFormatName = in.readUTF();
-               if(jobConf == null) {
-                       jobConf = new JobConf();
-               }
-               jobConf.readFields(in);
-               try {
-                       this.mapredOutputFormat = 
(org.apache.hadoop.mapred.OutputFormat<K,V>) 
Class.forName(hadoopOutputFormatName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to instantiate the 
hadoop output format", e);
-               }
-               ReflectionUtils.setConf(mapredOutputFormat, jobConf);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index aa9f048..376fd70 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
index d9797c3..fd2c493 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index de20fab..3547e47 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
index 275fd4c..edcc43b 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
@@ -29,9 +29,9 @@ import 
org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
 import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
 import org.apache.flink.types.Record;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
index 74118a3..e519062 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
@@ -27,9 +27,9 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter;
 import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.types.Record;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
deleted file mode 100644
index 2d2f518..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-
-
-public class HadoopUtils {
-       
-       /**
-        * Merge HadoopConfiguration into JobConf. This is necessary for the 
HDFS configuration.
-        */
-       public static void mergeHadoopConf(JobConf jobConf) {
-               org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopFileSystem.getHadoopConfiguration();
-               for (Map.Entry<String, String> e : hadoopConf) {
-                       jobConf.set(e.getKey(), e.getValue());
-               }
-       }
-       
-       public static JobContext instantiateJobContext(JobConf jobConf, JobID 
jobId) throws Exception {
-               try {
-                       // for Hadoop 1.xx
-                       Class<?> clazz = null;
-                       if(!TaskAttemptContext.class.isInterface()) { 
-                               clazz = 
Class.forName("org.apache.hadoop.mapred.JobContext", true, 
Thread.currentThread().getContextClassLoader());
-                       }
-                       // for Hadoop 2.xx
-                       else {
-                               clazz = 
Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, 
Thread.currentThread().getContextClassLoader());
-                       }
-                       Constructor<?> constructor = 
clazz.getDeclaredConstructor(JobConf.class, 
org.apache.hadoop.mapreduce.JobID.class);
-                       // for Hadoop 1.xx
-                       constructor.setAccessible(true);
-                       JobContext context = (JobContext) 
constructor.newInstance(jobConf, jobId);
-                       
-                       return context;
-               } catch(Exception e) {
-                       throw new Exception("Could not create instance of 
JobContext.", e);
-               }
-       }
-       
-       public static TaskAttemptContext instantiateTaskAttemptContext(JobConf 
jobConf,  TaskAttemptID taskAttemptID) throws Exception {
-               try {
-                       // for Hadoop 1.xx
-                       Class<?> clazz = null;
-                       if(!TaskAttemptContext.class.isInterface()) { 
-                               clazz = 
Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, 
Thread.currentThread().getContextClassLoader());
-                       }
-                       // for Hadoop 2.xx
-                       else {
-                               clazz = 
Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, 
Thread.currentThread().getContextClassLoader());
-                       }
-                       Constructor<?> constructor = 
clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
-                       // for Hadoop 1.xx
-                       constructor.setAccessible(true);
-                       TaskAttemptContext context = (TaskAttemptContext) 
constructor.newInstance(jobConf, taskAttemptID);
-                       return context;
-               } catch(Exception e) {
-                       throw new Exception("Could not create instance of 
TaskAttemptContext.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
deleted file mode 100644
index 483dd2f..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.util.Progressable;
-
-/**
- * This is a dummy progress
- *
- */
-public class HadoopDummyProgressable implements Progressable {
-       @Override
-       public void progress() { 
-               
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
deleted file mode 100644
index 84a1e9e..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This is a dummy progress monitor / reporter
- *
- */
-public class HadoopDummyReporter implements Reporter {
-
-       @Override
-       public void progress() {
-       }
-
-       @Override
-       public void setStatus(String status) {
-
-       }
-
-       @Override
-       public Counter getCounter(Enum<?> name) {
-               return null;
-       }
-
-       @Override
-       public Counter getCounter(String group, String name) {
-               return null;
-       }
-
-       @Override
-       public void incrCounter(Enum<?> key, long amount) {
-
-       }
-
-       @Override
-       public void incrCounter(String group, String counter, long amount) {
-
-       }
-
-       @Override
-       public InputSplit getInputSplit() throws UnsupportedOperationException {
-               return null;
-       }
-       // There should be an @Override, but some CDH4 dependency does not 
contain this method
-       public float getProgress() {
-               return 0;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
deleted file mode 100644
index aa2155d..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapred.JobConf;
-
-
-public class HadoopInputSplit extends LocatableInputSplit {
-
-       private static final long serialVersionUID = 1L;
-
-       
-       private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
-       
-       private JobConf jobConf;
-       
-       private int splitNumber;
-       private String hadoopInputSplitTypeName;
-
-
-       public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
-               return hadoopInputSplit;
-       }
-
-       public HadoopInputSplit() {
-               super();
-       }
-
-       public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
-
-               this.splitNumber = splitNumber;
-               this.hadoopInputSplit = hInputSplit;
-               this.hadoopInputSplitTypeName = 
hInputSplit.getClass().getName();
-               this.jobConf = jobconf;
-
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeInt(splitNumber);
-               out.writeUTF(hadoopInputSplitTypeName);
-               jobConf.write(out);
-               hadoopInputSplit.write(out);
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               this.splitNumber = in.readInt();
-               this.hadoopInputSplitTypeName = in.readUTF();
-               if(hadoopInputSplit == null) {
-                       try {
-                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit =
-                                               
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
-                               this.hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit 
);
-                       }
-                       catch (Exception e) {
-                               throw new RuntimeException("Unable to create 
InputSplit", e);
-                       }
-               }
-               jobConf = new JobConf();
-               jobConf.readFields(in);
-               if (this.hadoopInputSplit instanceof Configurable) {
-                       ((Configurable) 
this.hadoopInputSplit).setConf(this.jobConf);
-               }
-               this.hadoopInputSplit.readFields(in);
-
-       }
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeInt(splitNumber);
-               out.writeUTF(hadoopInputSplitTypeName);
-               jobConf.write(out);
-               hadoopInputSplit.write(out);
-
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               this.splitNumber=in.readInt();
-               this.hadoopInputSplitTypeName = in.readUTF();
-               if(hadoopInputSplit == null) {
-                       try {
-                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit =
-                                               
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
-                               this.hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit 
);
-                       }
-                       catch (Exception e) {
-                               throw new RuntimeException("Unable to create 
InputSplit", e);
-                       }
-               }
-               jobConf = new JobConf();
-               jobConf.readFields(in);
-               if (this.hadoopInputSplit instanceof Configurable) {
-                       ((Configurable) 
this.hadoopInputSplit).setConf(this.jobConf);
-               }
-               this.hadoopInputSplit.readFields(in);
-       }
-
-       @Override
-       public int getSplitNumber() {
-               return this.splitNumber;
-       }
-
-       @Override
-       public String[] getHostnames() {
-               try {
-                       return this.hadoopInputSplit.getLocations();
-               } catch(IOException ioe) {
-                       return new String[0];
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
deleted file mode 100644
index 20006b8..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, 
HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopInputFormat.class);
-
-       private org.apache.hadoop.mapreduce.InputFormat<K, V> 
mapreduceInputFormat;
-       private Class<K> keyClass;
-       private Class<V> valueClass;
-       private org.apache.hadoop.conf.Configuration configuration;
-
-       private transient RecordReader<K, V> recordReader;
-       private boolean fetched = false;
-       private boolean hasNext;
-
-       public HadoopInputFormat() {
-               super();
-       }
-
-       public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
-               super();
-               this.mapreduceInputFormat = mapreduceInputFormat;
-               this.keyClass = key;
-               this.valueClass = value;
-               this.configuration = job.getConfiguration();
-               HadoopUtils.mergeHadoopConf(configuration);
-       }
-
-       public void setConfiguration(org.apache.hadoop.conf.Configuration 
configuration) {
-               this.configuration = configuration;
-       }
-
-       public org.apache.hadoop.mapreduce.InputFormat<K,V> 
getHadoopInputFormat() {
-               return this.mapreduceInputFormat;
-       }
-
-       public void 
setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat) {
-               this.mapreduceInputFormat = mapreduceInputFormat;
-       }
-
-       public org.apache.hadoop.conf.Configuration getConfiguration() {
-               return this.configuration;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  InputFormat
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void configure(Configuration parameters) {
-               // nothing to do
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
-               // only gather base statistics for FileInputFormats
-               if(!(mapreduceInputFormat instanceof FileInputFormat)) {
-                       return null;
-               }
-
-               JobContext jobContext = null;
-               try {
-                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, null);
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               final FileBaseStatistics cachedFileStats = (cachedStats != null 
&& cachedStats instanceof FileBaseStatistics) ?
-                               (FileBaseStatistics) cachedStats : null;
-                               
-                               try {
-                                       final org.apache.hadoop.fs.Path[] paths 
= FileInputFormat.getInputPaths(jobContext);
-                                       return getFileStats(cachedFileStats, 
paths, new ArrayList<FileStatus>(1));
-                               } catch (IOException ioex) {
-                                       if (LOG.isWarnEnabled()) {
-                                               LOG.warn("Could not determine 
statistics due to an io error: " 
-                                                               + 
ioex.getMessage());
-                                       }
-                               } catch (Throwable t) {
-                                       if (LOG.isErrorEnabled()) {
-                                               LOG.error("Unexpected problem 
while getting the file statistics: "
-                                                               + 
t.getMessage(), t);
-                                       }
-                               }
-                               
-                               // no statistics available
-                               return null;
-       }
-
-       @Override
-       public HadoopInputSplit[] createInputSplits(int minNumSplits)
-                       throws IOException {
-               
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", 
minNumSplits);
-
-               JobContext jobContext = null;
-               try {
-                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, new JobID());
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               List<org.apache.hadoop.mapreduce.InputSplit> splits;
-               try {
-                       splits = 
this.mapreduceInputFormat.getSplits(jobContext);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not get Splits.", e);
-               }
-               HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
-
-               for(int i = 0; i < hadoopInputSplits.length; i++){
-                       hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
-               }
-               return hadoopInputSplits;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
-               return new LocatableInputSplitAssigner(inputSplits);
-       }
-
-       @Override
-       public void open(HadoopInputSplit split) throws IOException {
-               TaskAttemptContext context = null;
-               try {
-                       context = 
HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-               } catch(Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               try {
-                       this.recordReader = this.mapreduceInputFormat
-                                       
.createRecordReader(split.getHadoopInputSplit(), context);
-                       
this.recordReader.initialize(split.getHadoopInputSplit(), context);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not create RecordReader.", 
e);
-               } finally {
-                       this.fetched = false;
-               }
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               if(!this.fetched) {
-                       fetchNext();
-               }
-               return !this.hasNext;
-       }
-
-       private void fetchNext() throws IOException {
-               try {
-                       this.hasNext = this.recordReader.nextKeyValue();
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not fetch next KeyValue 
pair.", e);
-               } finally {
-                       this.fetched = true;
-               }
-       }
-
-       @Override
-       public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-               if(!this.fetched) {
-                       fetchNext();
-               }
-               if(!this.hasNext) {
-                       return null;
-               }
-               try {
-                       record.f0 = this.recordReader.getCurrentKey();
-                       record.f1 = this.recordReader.getCurrentValue();
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not get KeyValue pair.", 
e);
-               }
-               this.fetched = false;
-
-               return record;
-       }
-
-       @Override
-       public void close() throws IOException {
-               this.recordReader.close();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Helper methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, 
org.apache.hadoop.fs.Path[] hadoopFilePaths, 
-                                                                               
        ArrayList<FileStatus> files) throws IOException {
-
-               long latestModTime = 0L;
-
-               // get the file info and check whether the cached statistics 
are still valid.
-               for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-
-                       final Path filePath = new Path(hadoopPath.toUri());
-                       final FileSystem fs = FileSystem.get(filePath.toUri());
-
-                       final FileStatus file = fs.getFileStatus(filePath);
-                       latestModTime = Math.max(latestModTime, 
file.getModificationTime());
-
-                       // enumerate all files and check their modification 
time stamp.
-                       if (file.isDir()) {
-                               FileStatus[] fss = fs.listStatus(filePath);
-                               files.ensureCapacity(files.size() + fss.length);
-
-                               for (FileStatus s : fss) {
-                                       if (!s.isDir()) {
-                                               files.add(s);
-                                               latestModTime = 
Math.max(s.getModificationTime(), latestModTime);
-                                       }
-                               }
-                       } else {
-                               files.add(file);
-                       }
-               }
-
-               // check whether the cached statistics are still valid, if we 
have any
-               if (cachedStats != null && latestModTime <= 
cachedStats.getLastModificationTime()) {
-                       return cachedStats;
-               }
-
-               // calculate the whole length
-               long len = 0;
-               for (FileStatus s : files) {
-                       len += s.getLen();
-               }
-
-               // sanity check
-               if (len <= 0) {
-                       len = BaseStatistics.SIZE_UNKNOWN;
-               }
-
-               return new FileBaseStatistics(latestModTime, len, 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom serialization methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeUTF(this.mapreduceInputFormat.getClass().getName());
-               out.writeUTF(this.keyClass.getName());
-               out.writeUTF(this.valueClass.getName());
-               this.configuration.write(out);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               String hadoopInputFormatClassName = in.readUTF();
-               String keyClassName = in.readUTF();
-               String valueClassName = in.readUTF();
-
-               org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
-               configuration.readFields(in);
-
-               if(this.configuration == null) {
-                       this.configuration = configuration;
-               }
-
-               try {
-                       this.mapreduceInputFormat = 
(org.apache.hadoop.mapreduce.InputFormat<K,V>) 
Class.forName(hadoopInputFormatClassName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to instantiate the 
hadoop input format", e);
-               }
-               try {
-                       this.keyClass = (Class<K>) Class.forName(keyClassName, 
true, Thread.currentThread().getContextClassLoader());
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to find key class.", 
e);
-               }
-               try {
-                       this.valueClass = (Class<V>) 
Class.forName(valueClassName, true, 
Thread.currentThread().getContextClassLoader());
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to find value 
class.", e);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  ResultTypeQueryable
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public TypeInformation<Tuple2<K,V>> getProducedType() {
-               return new 
TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
deleted file mode 100644
index 696e1be..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
-
-public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, 
FinalizeOnMaster {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private org.apache.hadoop.conf.Configuration configuration;
-       private org.apache.hadoop.mapreduce.OutputFormat<K,V> 
mapreduceOutputFormat;
-       private transient RecordWriter<K,V> recordWriter;
-       private transient FileOutputCommitter fileOutputCommitter;
-       private transient TaskAttemptContext context;
-       private transient int taskNumber;
-       
-       public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> 
mapreduceOutputFormat, Job job) {
-               super();
-               this.mapreduceOutputFormat = mapreduceOutputFormat;
-               this.configuration = job.getConfiguration();
-               HadoopUtils.mergeHadoopConf(configuration);
-       }
-       
-       public void setConfiguration(org.apache.hadoop.conf.Configuration 
configuration) {
-               this.configuration = configuration;
-       }
-       
-       public org.apache.hadoop.conf.Configuration getConfiguration() {
-               return this.configuration;
-       }
-       
-       public org.apache.hadoop.mapreduce.OutputFormat<K,V> 
getHadoopOutputFormat() {
-               return this.mapreduceOutputFormat;
-       }
-       
-       public void 
setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> 
mapreduceOutputFormat) {
-               this.mapreduceOutputFormat = mapreduceOutputFormat;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  OutputFormat
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void configure(Configuration parameters) {
-               // nothing to do
-       }
-       
-       /**
-        * create the temporary output file for hadoop RecordWriter.
-        * @param taskNumber The number of the parallel instance.
-        * @param numTasks The number of parallel tasks.
-        * @throws IOException
-        */
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               if (Integer.toString(taskNumber + 1).length() > 6) {
-                       throw new IOException("Task id too large.");
-               }
-               
-               this.taskNumber = taskNumber+1;
-               
-               // for hadoop 2.2
-               this.configuration.set("mapreduce.output.basename", "tmp");
-               
-               TaskAttemptID taskAttemptID = 
TaskAttemptID.forName("attempt__0000_r_" 
-                               + String.format("%" + (6 - 
Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
-                               + Integer.toString(taskNumber + 1) 
-                               + "_0");
-               
-               this.configuration.set("mapred.task.id", 
taskAttemptID.toString());
-               this.configuration.setInt("mapred.task.partition", taskNumber + 
1);
-               // for hadoop 2.2
-               this.configuration.set("mapreduce.task.attempt.id", 
taskAttemptID.toString());
-               this.configuration.setInt("mapreduce.task.partition", 
taskNumber + 1);
-               
-               try {
-                       this.context = 
HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-               
-               this.fileOutputCommitter = new FileOutputCommitter(new 
Path(this.configuration.get("mapred.output.dir")), context);
-               
-               try {
-                       
this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration,
 new JobID()));
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-               
-               // compatible for hadoop 2.2.0, the temporary output directory 
is different from hadoop 1.2.1
-               this.configuration.set("mapreduce.task.output.dir", 
this.fileOutputCommitter.getWorkPath().toString());
-               
-               try {
-                       this.recordWriter = 
this.mapreduceOutputFormat.getRecordWriter(this.context);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not create RecordWriter.", 
e);
-               }
-       }
-       
-       
-       @Override
-       public void writeRecord(Tuple2<K, V> record) throws IOException {
-               try {
-                       this.recordWriter.write(record.f0, record.f1);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not write Record.", e);
-               }
-       }
-       
-       /**
-        * commit the task by moving the output file out from the temporary 
directory.
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       this.recordWriter.close(this.context);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not close RecordReader.", 
e);
-               }
-               
-               if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-                       this.fileOutputCommitter.commitTask(this.context);
-               }
-               
-               Path outputPath = new 
Path(this.configuration.get("mapred.output.dir"));
-               
-               // rename tmp-file to final name
-               FileSystem fs = FileSystem.get(outputPath.toUri(), 
this.configuration);
-               
-               String taskNumberStr = Integer.toString(this.taskNumber);
-               String tmpFileTemplate = "tmp-r-00000";
-               String tmpFile = 
tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
-               
-               if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
-                       fs.rename(new Path(outputPath.toString()+"/"+tmpFile), 
new Path(outputPath.toString()+"/"+taskNumberStr));
-               }
-       }
-       
-       @Override
-       public void finalizeGlobal(int parallelism) throws IOException {
-
-               JobContext jobContext;
-               TaskAttemptContext taskContext;
-               try {
-                       
-                       TaskAttemptID taskAttemptID = 
TaskAttemptID.forName("attempt__0000_r_" 
-                                       + String.format("%" + (6 - 
Integer.toString(1).length()) + "s"," ").replace(" ", "0") 
-                                       + Integer.toString(1) 
-                                       + "_0");
-                       
-                       jobContext = 
HadoopUtils.instantiateJobContext(this.configuration, new JobID());
-                       taskContext = 
HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-               this.fileOutputCommitter = new FileOutputCommitter(new 
Path(this.configuration.get("mapred.output.dir")), taskContext);
-               
-               // finalize HDFS output format
-               this.fileOutputCommitter.commitJob(jobContext);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom serialization methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
-               this.configuration.write(out);
-       }
-       
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               String hadoopOutputFormatClassName = in.readUTF();
-               
-               org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
-               configuration.readFields(in);
-               
-               if(this.configuration == null) {
-                       this.configuration = configuration;
-               }
-               
-               try {
-                       this.mapreduceOutputFormat = 
(org.apache.hadoop.mapreduce.OutputFormat<K,V>) 
Class.forName(hadoopOutputFormatClassName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to instantiate the 
hadoop output format", e);
-               }
-       }
-}

Reply via email to