Repository: flink
Updated Branches:
  refs/heads/master 7bc78cbf9 -> 8b3805ba5


http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 2b99fd2..f5758eb 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 
 /**
  * Implements a word count which takes the input file and counts the number of

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
deleted file mode 100644
index 86b730f..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapreduce.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-public class HadoopUtils {
-       
-       /**
-        * Merge HadoopConfiguration into Configuration. This is necessary for 
the HDFS configuration.
-        */
-       public static void mergeHadoopConf(Configuration configuration) {
-               Configuration hadoopConf = 
HadoopFileSystem.getHadoopConfiguration();
-               
-               for (Map.Entry<String, String> e : hadoopConf) {
-                       configuration.set(e.getKey(), e.getValue());
-               }
-       }
-       
-       public static JobContext instantiateJobContext(Configuration 
configuration, JobID jobId) throws Exception {
-               try {
-                       Class<?> clazz;
-                       // for Hadoop 1.xx
-                       if(JobContext.class.isInterface()) {
-                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, 
Thread.currentThread().getContextClassLoader());
-                       }
-                       // for Hadoop 2.xx
-                       else {
-                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.JobContext", true, 
Thread.currentThread().getContextClassLoader());
-                       }
-                       Constructor<?> constructor = 
clazz.getConstructor(Configuration.class, JobID.class);
-                       JobContext context = (JobContext) 
constructor.newInstance(configuration, jobId);
-                       
-                       return context;
-               } catch(Exception e) {
-                       throw new Exception("Could not create instance of 
JobContext.");
-               }
-       }
-       
-       public static TaskAttemptContext 
instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID 
taskAttemptID) throws Exception {
-               try {
-                       Class<?> clazz;
-                       // for Hadoop 1.xx
-                       if(JobContext.class.isInterface()) {
-                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-                       }
-                       // for Hadoop 2.xx
-                       else {
-                               clazz = 
Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
-                       }
-                       Constructor<?> constructor = 
clazz.getConstructor(Configuration.class, TaskAttemptID.class);
-                       TaskAttemptContext context = (TaskAttemptContext) 
constructor.newInstance(configuration, taskAttemptID);
-                       
-                       return context;
-               } catch(Exception e) {
-                       throw new Exception("Could not create instance of 
TaskAttemptContext.");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
deleted file mode 100644
index 7477c28..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapreduce.JobContext;
-
-
-public class HadoopInputSplit extends LocatableInputSplit {
-       
-       public transient org.apache.hadoop.mapreduce.InputSplit 
mapreduceInputSplit;
-       public transient JobContext jobContext;
-       
-       private int splitNumber;
-       
-       public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
-               return mapreduceInputSplit;
-       }
-       
-       
-       public HadoopInputSplit() {
-               super();
-       }
-       
-       
-       public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext 
jobContext) {
-               this.splitNumber = splitNumber;
-               if(!(mapreduceInputSplit instanceof Writable)) {
-                       throw new IllegalArgumentException("InputSplit must 
implement Writable interface.");
-               }
-               this.mapreduceInputSplit = mapreduceInputSplit;
-               this.jobContext = jobContext;
-       }
-       
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeInt(this.splitNumber);
-               out.writeUTF(this.mapreduceInputSplit.getClass().getName());
-               Writable w = (Writable) this.mapreduceInputSplit;
-               w.write(out);
-       }
-       
-       @Override
-       public void read(DataInputView in) throws IOException {
-               this.splitNumber = in.readInt();
-               String className = in.readUTF();
-               
-               if(this.mapreduceInputSplit == null) {
-                       try {
-                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit = 
-                                               
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
-                               this.mapreduceInputSplit = 
(org.apache.hadoop.mapreduce.InputSplit) 
WritableFactories.newInstance(inputSplit);
-                       } catch (Exception e) {
-                               throw new RuntimeException("Unable to create 
InputSplit", e);
-                       }
-               }
-               ((Writable)this.mapreduceInputSplit).readFields(in);
-       }
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeInt(this.splitNumber);
-               out.writeUTF(this.mapreduceInputSplit.getClass().getName());
-               Writable w = (Writable) this.mapreduceInputSplit;
-               w.write(out);
-
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               this.splitNumber=in.readInt();
-               String className = in.readUTF();
-
-               if(this.mapreduceInputSplit == null) {
-                       try {
-                               Class<? extends org.apache.hadoop.io.Writable> 
inputSplit =
-                                               
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
-                               this.mapreduceInputSplit = 
(org.apache.hadoop.mapreduce.InputSplit) 
WritableFactories.newInstance(inputSplit);
-                       } catch (Exception e) {
-                               throw new RuntimeException("Unable to create 
InputSplit", e);
-                       }
-               }
-               ((Writable)this.mapreduceInputSplit).readFields(in);
-       }
-       
-       @Override
-       public int getSplitNumber() {
-               return this.splitNumber;
-       }
-       
-       @Override
-       public String[] getHostnames() {
-               try {
-                       return this.mapreduceInputSplit.getLocations();
-               } catch (IOException e) {
-                       return new String[0];
-               } catch (InterruptedException e) {
-                       return new String[0];
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
deleted file mode 100644
index 32396b8..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class HadoopIOFormatsITCase extends JavaProgramTestBase {
-
-       private static int NUM_PROGRAMS = 2;
-
-       private int curProgId = config.getInteger("ProgramId", -1);
-       private String[] resultPath;
-       private String[] expectedResult;
-       private String sequenceFileInPath;
-       private String sequenceFileInPathNull;
-
-       public HadoopIOFormatsITCase(Configuration config) {
-               super(config);  
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = new String[] {getTempDirPath("result0"), 
getTempDirPath("result1") };
-
-               File sequenceFile = createAndRegisterTempFile("seqFile");
-               sequenceFileInPath = sequenceFile.toURI().toString();
-
-               // Create a sequence file
-               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
-               FileSystem fs = 
FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
-               Path path = new Path(sequenceFile.getAbsolutePath());
-
-               //  ------------------ Long / Text Key Value pair: ------------
-               int kvCount = 4;
-
-               LongWritable key = new LongWritable();
-               Text value = new Text();
-               SequenceFile.Writer writer = null;
-               try {
-                       writer = SequenceFile.createWriter( fs, conf, path, 
key.getClass(), value.getClass());
-                       for (int i = 0; i < kvCount; i ++) {
-                               if(i == 1) {
-                                       // write key = 0 a bit more often.
-                                       for(int a = 0;a < 15; a++) {
-                                               key.set(i);
-                                               value.set(i+" - somestring");
-                                               writer.append(key, value);
-                                       }
-                               }
-                               key.set(i);
-                               value.set(i+" - somestring");
-                               writer.append(key, value);
-                       }
-               } finally {
-                       IOUtils.closeStream(writer);
-               }
-
-
-               //  ------------------ Long / Text Key Value pair: ------------
-
-               File sequenceFileNull = 
createAndRegisterTempFile("seqFileNullKey");
-               sequenceFileInPathNull = sequenceFileNull.toURI().toString();
-               path = new Path(sequenceFileInPathNull);
-
-               LongWritable value1 = new LongWritable();
-               SequenceFile.Writer writer1 = null;
-               try {
-                       writer1 = SequenceFile.createWriter( fs, conf, path, 
NullWritable.class, value1.getClass());
-                       for (int i = 0; i < kvCount; i ++) {
-                               value1.set(i);
-                               writer1.append(NullWritable.get(), value1);
-                       }
-               } finally {
-                       IOUtils.closeStream(writer1);
-               }
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, 
resultPath, sequenceFileInPath, sequenceFileInPathNull);
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               for(int i = 0; i < resultPath.length; i++) {
-                       compareResultsByLinesInMemory(expectedResult[i], 
resultPath[i]);
-               }
-       }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
-
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
-
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-               
-               return toParameterList(tConfigs);
-       }
-       
-       public static class HadoopIOFormatPrograms {
-               
-               public static String[] runProgram(int progId, String 
resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws 
Exception {
-                       
-                       switch(progId) {
-                       case 1: {
-                               /**
-                                * Test sequence file, including a key access.
-                                */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                               SequenceFileInputFormat<LongWritable, Text> 
sfif = new SequenceFileInputFormat<LongWritable, Text>();
-                               JobConf hdconf = new JobConf();
-                               SequenceFileInputFormat.addInputPath(hdconf, 
new Path(sequenceFileInPath));
-                               HadoopInputFormat<LongWritable, Text> hif = new 
HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, 
hdconf);
-                               DataSet<Tuple2<LongWritable, Text>> ds = 
env.createInput(hif);
-                               DataSet<Tuple2<Long, Text>> sumed = ds.map(new 
MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
-                                       @Override
-                                       public Tuple2<Long, Text> 
map(Tuple2<LongWritable, Text> value) throws Exception {
-                                               return new Tuple2<Long, 
Text>(value.f0.get(), value.f1);
-                                       }
-                               }).sum(0);
-                               sumed.writeAsText(resultPath[0]);
-                               DataSet<String> res = ds.distinct(0).map(new 
MapFunction<Tuple2<LongWritable, Text>, String>() {
-                                       @Override
-                                       public String map(Tuple2<LongWritable, 
Text> value) throws Exception {
-                                               return value.f1 + " - " + 
value.f0.get();
-                                       }
-                               });
-                               res.writeAsText(resultPath[1]);
-                               env.execute();
-                               
-                               // return expected result
-                               return  new String [] {"(21,3 - somestring)", 
"0 - somestring - 0\n" +
-                                               "1 - somestring - 1\n" +
-                                               "2 - somestring - 2\n" +
-                                               "3 - somestring - 3\n"};
-
-                       }
-                       case 2: {
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                               SequenceFileInputFormat<NullWritable, 
LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
-                               JobConf hdconf = new JobConf();
-                               SequenceFileInputFormat.addInputPath(hdconf, 
new Path(sequenceFileInPathNull));
-                               HadoopInputFormat<NullWritable, LongWritable> 
hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, 
NullWritable.class, LongWritable.class, hdconf);
-                               DataSet<Tuple2<NullWritable, LongWritable>> ds 
= env.createInput(hif);
-                               DataSet<Tuple2<Void, Long>> res = ds.map(new 
MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
-                                       @Override
-                                       public Tuple2<Void, Long> 
map(Tuple2<NullWritable, LongWritable> value) throws Exception {
-                                               return new Tuple2<Void, 
Long>(null, value.f1.get());
-                                       }
-                               });
-                               DataSet<Tuple2<Void, Long>> res1 = 
res.groupBy(1).sum(1);
-                               res1.writeAsText(resultPath[1]);
-                               res.writeAsText(resultPath[0]);
-                               env.execute();
-
-                               // return expected result
-                               return  new String [] {"(null,2)\n" +
-                                               "(null,0)\n" +
-                                               "(null,1)\n" +
-                                               "(null,3)",
-                                               "(null,0)\n" +
-                                               "(null,1)\n" +
-                                               "(null,2)\n" +
-                                               "(null,3)"};
-                       }
-                       default:
-                               throw new IllegalArgumentException("Invalid 
program id");
-                       }
-                       
-               }
-       
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
deleted file mode 100644
index 00fd1f9..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.fail;
-
-
-public class HadoopInputFormatTest {
-
-
-       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
-
-               public DummyVoidKeyInputFormat() {
-               }
-
-               @Override
-               public org.apache.hadoop.mapred.RecordReader<Void, T> 
getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf 
jobConf, Reporter reporter) throws IOException {
-                       return null;
-               }
-       }
-       
-       
-       @Test
-       public void checkTypeInformation() {
-               try {
-                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                       // Set up the Hadoop Input Format
-                       Job job = Job.getInstance();
-                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, new JobConf());
-
-                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
-                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
-                       
-                       if(tupleType.isTupleType()) {
-                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
-                                       fail("Tuple type information was not 
set correctly!");
-                               }
-                       } else {
-                               fail("Type information was not set to tuple 
type information!");
-                       }
-
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
deleted file mode 100644
index d79afaa..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapreduce;
-
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.fail;
-
-
-
-public class HadoopInputFormatTest {
-
-
-       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
-
-               public DummyVoidKeyInputFormat() {
-               }
-
-               @Override
-               public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
-                       return null;
-               }
-       }
-       
-       
-       @Test
-       public void checkTypeInformation() {
-               try {
-                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                       // Set up the Hadoop Input Format
-                       Job job = Job.getInstance();
-                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, job);
-
-                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
-                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
-                       
-                       if(tupleType.isTupleType()) {
-                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
-                                       fail("Tuple type information was not 
set correctly!");
-                               }
-                       } else {
-                               fail("Type information was not set to tuple 
type information!");
-                       }
-
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index bcb9764..7bf3e2e 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -116,12 +116,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
                
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-                       <scope>test</scope>
-               </dependency>
+               <!--<dependency>-->
+                       <!--<groupId>com.google.guava</groupId>-->
+                       <!--<artifactId>guava</artifactId>-->
+                       <!--<version>${guava.version}</version>-->
+                       <!--<scope>test</scope>-->
+               <!--</dependency>-->
                
                <dependency>
                        <groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
new file mode 100644
index 0000000..0cb1ac5
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoop.mapred;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+
+       private static int NUM_PROGRAMS = 2;
+
+       private int curProgId = config.getInteger("ProgramId", -1);
+       private String[] resultPath;
+       private String[] expectedResult;
+       private String sequenceFileInPath;
+       private String sequenceFileInPathNull;
+
+       public HadoopIOFormatsITCase(Configuration config) {
+               super(config);  
+       }
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = new String[] {getTempDirPath("result0"), 
getTempDirPath("result1") };
+
+               File sequenceFile = createAndRegisterTempFile("seqFile");
+               sequenceFileInPath = sequenceFile.toURI().toString();
+
+               // Create a sequence file
+               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+               FileSystem fs = 
FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
+               Path path = new Path(sequenceFile.getAbsolutePath());
+
+               //  ------------------ Long / Text Key Value pair: ------------
+               int kvCount = 4;
+
+               LongWritable key = new LongWritable();
+               Text value = new Text();
+               SequenceFile.Writer writer = null;
+               try {
+                       writer = SequenceFile.createWriter( fs, conf, path, 
key.getClass(), value.getClass());
+                       for (int i = 0; i < kvCount; i ++) {
+                               if(i == 1) {
+                                       // write key = 0 a bit more often.
+                                       for(int a = 0;a < 15; a++) {
+                                               key.set(i);
+                                               value.set(i+" - somestring");
+                                               writer.append(key, value);
+                                       }
+                               }
+                               key.set(i);
+                               value.set(i+" - somestring");
+                               writer.append(key, value);
+                       }
+               } finally {
+                       IOUtils.closeStream(writer);
+               }
+
+
+               //  ------------------ Long / Text Key Value pair: ------------
+
+               File sequenceFileNull = 
createAndRegisterTempFile("seqFileNullKey");
+               sequenceFileInPathNull = sequenceFileNull.toURI().toString();
+               path = new Path(sequenceFileInPathNull);
+
+               LongWritable value1 = new LongWritable();
+               SequenceFile.Writer writer1 = null;
+               try {
+                       writer1 = SequenceFile.createWriter( fs, conf, path, 
NullWritable.class, value1.getClass());
+                       for (int i = 0; i < kvCount; i ++) {
+                               value1.set(i);
+                               writer1.append(NullWritable.get(), value1);
+                       }
+               } finally {
+                       IOUtils.closeStream(writer1);
+               }
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, 
resultPath, sequenceFileInPath, sequenceFileInPathNull);
+       }
+       
+       @Override
+       protected void postSubmit() throws Exception {
+               for(int i = 0; i < resultPath.length; i++) {
+                       compareResultsByLinesInMemory(expectedResult[i], 
resultPath[i]);
+               }
+       }
+       
+       @Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+               
+               return TestBaseUtils.toParameterList(tConfigs);
+       }
+       
+       public static class HadoopIOFormatPrograms {
+               
+               public static String[] runProgram(int progId, String 
resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws 
Exception {
+                       
+                       switch(progId) {
+                       case 1: {
+                               /**
+                                * Test sequence file, including a key access.
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               SequenceFileInputFormat<LongWritable, Text> 
sfif = new SequenceFileInputFormat<LongWritable, Text>();
+                               JobConf hdconf = new JobConf();
+                               SequenceFileInputFormat.addInputPath(hdconf, 
new Path(sequenceFileInPath));
+                               HadoopInputFormat<LongWritable, Text> hif = new 
HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, 
hdconf);
+                               DataSet<Tuple2<LongWritable, Text>> ds = 
env.createInput(hif);
+                               DataSet<Tuple2<Long, Text>> sumed = ds.map(new 
MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
+                                       @Override
+                                       public Tuple2<Long, Text> 
map(Tuple2<LongWritable, Text> value) throws Exception {
+                                               return new Tuple2<Long, 
Text>(value.f0.get(), value.f1);
+                                       }
+                               }).sum(0);
+                               sumed.writeAsText(resultPath[0]);
+                               DataSet<String> res = ds.distinct(0).map(new 
MapFunction<Tuple2<LongWritable, Text>, String>() {
+                                       @Override
+                                       public String map(Tuple2<LongWritable, 
Text> value) throws Exception {
+                                               return value.f1 + " - " + 
value.f0.get();
+                                       }
+                               });
+                               res.writeAsText(resultPath[1]);
+                               env.execute();
+                               
+                               // return expected result
+                               return  new String [] {"(21,3 - somestring)", 
"0 - somestring - 0\n" +
+                                               "1 - somestring - 1\n" +
+                                               "2 - somestring - 2\n" +
+                                               "3 - somestring - 3\n"};
+
+                       }
+                       case 2: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               SequenceFileInputFormat<NullWritable, 
LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
+                               JobConf hdconf = new JobConf();
+                               SequenceFileInputFormat.addInputPath(hdconf, 
new Path(sequenceFileInPathNull));
+                               HadoopInputFormat<NullWritable, LongWritable> 
hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, 
NullWritable.class, LongWritable.class, hdconf);
+                               DataSet<Tuple2<NullWritable, LongWritable>> ds 
= env.createInput(hif);
+                               DataSet<Tuple2<Void, Long>> res = ds.map(new 
MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
+                                       @Override
+                                       public Tuple2<Void, Long> 
map(Tuple2<NullWritable, LongWritable> value) throws Exception {
+                                               return new Tuple2<Void, 
Long>(null, value.f1.get());
+                                       }
+                               });
+                               DataSet<Tuple2<Void, Long>> res1 = 
res.groupBy(1).sum(1);
+                               res1.writeAsText(resultPath[1]);
+                               res.writeAsText(resultPath[0]);
+                               env.execute();
+
+                               // return expected result
+                               return  new String [] {"(null,2)\n" +
+                                               "(null,0)\n" +
+                                               "(null,1)\n" +
+                                               "(null,3)",
+                                               "(null,0)\n" +
+                                               "(null,1)\n" +
+                                               "(null,2)\n" +
+                                               "(null,3)"};
+                       }
+                       default:
+                               throw new IllegalArgumentException("Invalid 
program id");
+                       }
+                       
+               }
+       
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
new file mode 100644
index 0000000..037610e
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoop.mapred;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+public class WordCountMapredITCase extends JavaProgramTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       public WordCountMapredITCase(){
+//             setDegreeOfParallelism(4);
+//             setNumTaskManagers(2);
+//             setTaskManagerNumSlots(2);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[] {".", "_"});
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+//             env.setDegreeOfParallelism(1);
+
+
+               DataSet<Tuple2<LongWritable, Text>> input = 
env.readHadoopFile(new TextInputFormat(),
+                               LongWritable.class, Text.class, textPath);
+
+               DataSet<String> text = input.map(new 
MapFunction<Tuple2<LongWritable, Text>, String>() {
+                       @Override
+                       public String map(Tuple2<LongWritable, Text> value) 
throws Exception {
+                               return value.f1.toString();
+                       }
+               });
+
+
+               DataSet<Tuple2<String, Integer>> counts =
+                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
+                               text.flatMap(new Tokenizer())
+                                               // group by the tuple field "0" 
and sum up tuple field "1"
+                                               .groupBy(0)
+                                               .sum(1);
+
+               DataSet<Tuple2<Text, LongWritable>> words = counts.map(new 
MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
+
+
+                       @Override
+                       public Tuple2<Text, LongWritable> map(Tuple2<String, 
Integer> value) throws Exception {
+                               return new Tuple2<Text, LongWritable>(new 
Text(value.f0), new LongWritable(value.f1));
+                       }
+               });
+
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), new JobConf());
+               
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+               TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), 
new Path(resultPath));
+
+               // Output & Execute
+               words.output(hadoopOutputFormat);
+               env.execute("Hadoop Compat WordCount");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
new file mode 100644
index 0000000..3bdaa22
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoop.mapreduce;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class WordCountMapreduceITCase extends JavaProgramTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       public WordCountMapreduceITCase(){
+//             setDegreeOfParallelism(4);
+//             setNumTaskManagers(2);
+//             setTaskManagerNumSlots(2);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[] {".", "_"});
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+
+               DataSet<Tuple2<LongWritable, Text>> input = 
env.readHadoopFile(new TextInputFormat(),
+                               LongWritable.class, Text.class, textPath);
+
+               DataSet<String> text = input.map(new 
MapFunction<Tuple2<LongWritable, Text>, String>() {
+                       @Override
+                       public String map(Tuple2<LongWritable, Text> value) 
throws Exception {
+                               return value.f1.toString();
+                       }
+               });
+
+
+               DataSet<Tuple2<String, Integer>> counts =
+                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
+                               text.flatMap(new Tokenizer())
+                                               // group by the tuple field "0" 
and sum up tuple field "1"
+                                               .groupBy(0)
+                                               .sum(1);
+
+               DataSet<Tuple2<Text, LongWritable>> words = counts.map(new 
MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
+
+
+                       @Override
+                       public Tuple2<Text, LongWritable> map(Tuple2<String, 
Integer> value) throws Exception {
+                               return new Tuple2<Text, LongWritable>(new 
Text(value.f0), new LongWritable(value.f1));
+                       }
+               });
+
+               // Set up Hadoop Output Format
+               Job job = Job.getInstance();
+               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), job);
+               job.getConfiguration().set("mapred.textoutputformat.separator", 
" ");
+               TextOutputFormat.setOutputPath(job, new Path(resultPath));
+
+               // Output & Execute
+               words.output(hadoopOutputFormat);
+               env.execute("Hadoop Compat WordCount");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
new file mode 100644
index 0000000..c8d6639
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.scala._
+
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, 
TextInputFormat}
+
+class WordCountMapredITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
Array[String](".", "_"))
+  }
+
+  protected def testProgram() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], 
classOf[Text], textPath)
+
+    val text = input map { _._2.toString }
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+
+    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      new JobConf)
+    hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
+
+    FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new 
Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
new file mode 100644
index 0000000..8988baf
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
TextOutputFormat}
+
+class WordCountMapreduceITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
Array[String](".", "_"))
+  }
+
+  protected def testProgram() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], 
classOf[Text], textPath)
+
+    val text = input map { _._2.toString }
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+
+    val job = Job.getInstance()
+    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      job)
+    
hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " 
")
+
+    FileOutputFormat.setOutputPath(job, new Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+}
+

Reply via email to