[FLINK-3511] [hadoop-compatibility] Move hadoop-compatibility examples to test 
scope


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/131f016e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/131f016e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/131f016e

Branch: refs/heads/release-1.0
Commit: 131f016e71540a5d1e264084c630b93de1aeabae
Parents: 0444792
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 26 16:12:59 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Fri Feb 26 20:57:21 2016 +0100

----------------------------------------------------------------------
 .../example/HadoopMapredCompatWordCount.java    | 133 -------------------
 .../mapreduce/example/WordCount.java            | 120 -----------------
 .../mapred/HadoopMapredITCase.java              |   2 +-
 .../example/HadoopMapredCompatWordCount.java    | 133 +++++++++++++++++++
 .../mapreduce/HadoopInputOutputITCase.java      |   2 +-
 .../mapreduce/example/WordCount.java            | 120 +++++++++++++++++
 6 files changed, 255 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
deleted file mode 100644
index 3547e47..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.example;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
- * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
- */
-public class HadoopMapredCompatWordCount {
-       
-       public static void main(String[] args) throws Exception {
-               if (args.length < 2) {
-                       System.err.println("Usage: WordCount <input path> 
<result path>");
-                       return;
-               }
-               
-               final String inputPath = args[0];
-               final String outputPath = args[1];
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // Set up the Hadoop Input Format
-               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, new JobConf());
-               TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), 
new Path(inputPath));
-               
-               // Create a Flink job with it
-               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
-               
-               DataSet<Tuple2<Text, LongWritable>> words = 
-                               text.flatMap(new 
HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
-                                       .groupBy(0).reduceGroup(new 
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new 
Counter(), new Counter()));
-               
-               // Set up Hadoop Output Format
-               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
-                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), new JobConf());
-               
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
-               TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), 
new Path(outputPath));
-               
-               // Output & Execute
-               words.output(hadoopOutputFormat).setParallelism(1);
-               env.execute("Hadoop Compat WordCount");
-       }
-       
-       
-       public static final class Tokenizer implements Mapper<LongWritable, 
Text, Text, LongWritable> {
-
-               @Override
-               public void map(LongWritable k, Text v, OutputCollector<Text, 
LongWritable> out, Reporter rep) 
-                               throws IOException {
-                       // normalize and split the line
-                       String line = v.toString();
-                       String[] tokens = line.toLowerCase().split("\\W+");
-                       
-                       // emit the pairs
-                       for (String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Text(token), new 
LongWritable(1l));
-                               }
-                       }
-               }
-               
-               @Override
-               public void configure(JobConf arg0) { }
-               
-               @Override
-               public void close() throws IOException { }
-               
-       }
-       
-       public static final class Counter implements Reducer<Text, 
LongWritable, Text, LongWritable> {
-
-               @Override
-               public void reduce(Text k, Iterator<LongWritable> vs, 
OutputCollector<Text, LongWritable> out, Reporter rep)
-                               throws IOException {
-                       
-                       long cnt = 0;
-                       while(vs.hasNext()) {
-                               cnt += vs.next().get();
-                       }
-                       out.collect(k, new LongWritable(cnt));
-                       
-               }
-               
-               @Override
-               public void configure(JobConf arg0) { }
-               
-               @Override
-               public void close() throws IOException { }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
deleted file mode 100644
index f5758eb..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapreduce.example;
-
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
- * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
- */
-@SuppressWarnings("serial")
-public class WordCount {
-       
-       public static void main(String[] args) throws Exception {
-               if (args.length < 2) {
-                       System.err.println("Usage: WordCount <input path> 
<result path>");
-                       return;
-               }
-               
-               final String inputPath = args[0];
-               final String outputPath = args[1];
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // Set up the Hadoop Input Format
-               Job job = Job.getInstance();
-               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, job);
-               TextInputFormat.addInputPath(job, new Path(inputPath));
-               
-               // Create a Flink job with it
-               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
-               
-               // Tokenize the line and convert from Writable "Text" to String 
for better handling
-               DataSet<Tuple2<String, Integer>> words = text.flatMap(new 
Tokenizer());
-               
-               // Sum up the words
-               DataSet<Tuple2<String, Integer>> result = 
words.groupBy(0).aggregate(Aggregations.SUM, 1);
-               
-               // Convert String back to Writable "Text" for use with Hadoop 
Output Format
-               DataSet<Tuple2<Text, IntWritable>> hadoopResult = 
result.map(new HadoopDatatypeMapper());
-               
-               // Set up Hadoop Output Format
-               HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new 
HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, 
IntWritable>(), job);
-               
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator",
 " ");
-               
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", 
" "); // set the value for both, since this test
-               // is being executed with both types (hadoop1 and hadoop2 
profile)
-               TextOutputFormat.setOutputPath(job, new Path(outputPath));
-               
-               // Output & Execute
-               hadoopResult.output(hadoopOutputFormat);
-               env.execute("Word Count");
-       }
-       
-       /**
-        * Splits a line into words and converts Hadoop Writables into normal 
Java data types.
-        */
-       public static final class Tokenizer extends 
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-               
-               @Override
-               public void flatMap(Tuple2<LongWritable, Text> value, 
Collector<Tuple2<String, Integer>> out) {
-                       // normalize and split the line
-                       String line = value.f1.toString();
-                       String[] tokens = line.toLowerCase().split("\\W+");
-                       
-                       // emit the pairs
-                       for (String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
-                               }
-                       }
-               }
-       }
-       
-       /**
-        * Converts Java data types to Hadoop Writables.
-        */
-       public static final class HadoopDatatypeMapper extends 
RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-               
-               @Override
-               public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> 
value) throws Exception {
-                       return new Tuple2<Text, IntWritable>(new 
Text(value.f0), new IntWritable(value.f1));
-               }
-               
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index bbb7503..ccc0d82 100644
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred;
 
-import 
org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
+import 
org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
new file mode 100644
index 0000000..ce0143a
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
+ */
+public class HadoopMapredCompatWordCount {
+       
+       public static void main(String[] args) throws Exception {
+               if (args.length < 2) {
+                       System.err.println("Usage: WordCount <input path> 
<result path>");
+                       return;
+               }
+               
+               final String inputPath = args[0];
+               final String outputPath = args[1];
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // Set up the Hadoop Input Format
+               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, new JobConf());
+               TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), 
new Path(inputPath));
+               
+               // Create a Flink job with it
+               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
+               
+               DataSet<Tuple2<Text, LongWritable>> words = 
+                               text.flatMap(new 
HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
+                                       .groupBy(0).reduceGroup(new 
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new 
Counter(), new Counter()));
+               
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
+                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), new JobConf());
+               
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+               TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), 
new Path(outputPath));
+               
+               // Output & Execute
+               words.output(hadoopOutputFormat).setParallelism(1);
+               env.execute("Hadoop Compat WordCount");
+       }
+       
+       
+       public static final class Tokenizer implements Mapper<LongWritable, 
Text, Text, LongWritable> {
+
+               @Override
+               public void map(LongWritable k, Text v, OutputCollector<Text, 
LongWritable> out, Reporter rep) 
+                               throws IOException {
+                       // normalize and split the line
+                       String line = v.toString();
+                       String[] tokens = line.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Text(token), new 
LongWritable(1l));
+                               }
+                       }
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+               
+               @Override
+               public void close() throws IOException { }
+               
+       }
+       
+       public static final class Counter implements Reducer<Text, 
LongWritable, Text, LongWritable> {
+
+               @Override
+               public void reduce(Text k, Iterator<LongWritable> vs, 
OutputCollector<Text, LongWritable> out, Reporter rep)
+                               throws IOException {
+                       
+                       long cnt = 0;
+                       while(vs.hasNext()) {
+                               cnt += vs.next().get();
+                       }
+                       out.collect(k, new LongWritable(cnt));
+                       
+               }
+               
+               @Override
+               public void configure(JobConf arg0) { }
+               
+               @Override
+               public void close() throws IOException { }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 9b4aeea..698e356 100644
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapreduce;
 
-import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
+import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
new file mode 100644
index 0000000..3de3f72
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce.example;
+
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop 
Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop 
Output Formats.
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+       
+       public static void main(String[] args) throws Exception {
+               if (args.length < 2) {
+                       System.err.println("Usage: WordCount <input path> 
<result path>");
+                       return;
+               }
+               
+               final String inputPath = args[0];
+               final String outputPath = args[1];
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // Set up the Hadoop Input Format
+               Job job = Job.getInstance();
+               HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new 
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), 
LongWritable.class, Text.class, job);
+               TextInputFormat.addInputPath(job, new Path(inputPath));
+               
+               // Create a Flink job with it
+               DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);
+               
+               // Tokenize the line and convert from Writable "Text" to String 
for better handling
+               DataSet<Tuple2<String, Integer>> words = text.flatMap(new 
Tokenizer());
+               
+               // Sum up the words
+               DataSet<Tuple2<String, Integer>> result = 
words.groupBy(0).aggregate(Aggregations.SUM, 1);
+               
+               // Convert String back to Writable "Text" for use with Hadoop 
Output Format
+               DataSet<Tuple2<Text, IntWritable>> hadoopResult = 
result.map(new HadoopDatatypeMapper());
+               
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new 
HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, 
IntWritable>(), job);
+               
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator",
 " ");
+               
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", 
" "); // set the value for both, since this test
+               // is being executed with both types (hadoop1 and hadoop2 
profile)
+               TextOutputFormat.setOutputPath(job, new Path(outputPath));
+               
+               // Output & Execute
+               hadoopResult.output(hadoopOutputFormat);
+               env.execute("Word Count");
+       }
+       
+       /**
+        * Splits a line into words and converts Hadoop Writables into normal 
Java data types.
+        */
+       public static final class Tokenizer extends 
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+               
+               @Override
+               public void flatMap(Tuple2<LongWritable, Text> value, 
Collector<Tuple2<String, Integer>> out) {
+                       // normalize and split the line
+                       String line = value.f1.toString();
+                       String[] tokens = line.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * Converts Java data types to Hadoop Writables.
+        */
+       public static final class HadoopDatatypeMapper extends 
RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+               
+               @Override
+               public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> 
value) throws Exception {
+                       return new Tuple2<Text, IntWritable>(new 
Text(value.f0), new IntWritable(value.f1));
+               }
+               
+       }
+       
+}

Reply via email to