[FLINK-3511] [hadoop-compatibility] Move hadoop-compatibility examples to test scope
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/131f016e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/131f016e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/131f016e Branch: refs/heads/release-1.0 Commit: 131f016e71540a5d1e264084c630b93de1aeabae Parents: 0444792 Author: Till Rohrmann <[email protected]> Authored: Fri Feb 26 16:12:59 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Fri Feb 26 20:57:21 2016 +0100 ---------------------------------------------------------------------- .../example/HadoopMapredCompatWordCount.java | 133 ------------------- .../mapreduce/example/WordCount.java | 120 ----------------- .../mapred/HadoopMapredITCase.java | 2 +- .../example/HadoopMapredCompatWordCount.java | 133 +++++++++++++++++++ .../mapreduce/HadoopInputOutputITCase.java | 2 +- .../mapreduce/example/WordCount.java | 120 +++++++++++++++++ 6 files changed, 255 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java deleted file mode 100644 index 3547e47..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred.example; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; -import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; -import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; -import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; - - - -/** - * Implements a word count which takes the input file and counts the number of - * occurrences of each word in the file and writes the result back to disk. - * - * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to - * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. - */ -public class HadoopMapredCompatWordCount { - - public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: WordCount <input path> <result path>"); - return; - } - - final String inputPath = args[0]; - final String outputPath = args[1]; - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // Set up the Hadoop Input Format - HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf()); - TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath)); - - // Create a Flink job with it - DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); - - DataSet<Tuple2<Text, LongWritable>> words = - text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer())) - .groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter())); - - // Set up Hadoop Output Format - HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = - new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf()); - hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); - TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath)); - - // Output & Execute - words.output(hadoopOutputFormat).setParallelism(1); - env.execute("Hadoop Compat WordCount"); - } - - - public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> { - - @Override - public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) - throws IOException { - // normalize and split the line - String line = v.toString(); - String[] tokens = line.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Text(token), new LongWritable(1l)); - } - } - } - - @Override - public void configure(JobConf arg0) { } - - @Override - public void close() throws IOException { } - - } - - public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> { - - @Override - public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep) - throws IOException { - - long cnt = 0; - while(vs.hasNext()) { - cnt += vs.next().get(); - } - out.collect(k, new LongWritable(cnt)); - - } - - @Override - public void configure(JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java deleted file mode 100644 index f5758eb..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapreduce.example; - -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; - -/** - * Implements a word count which takes the input file and counts the number of - * occurrences of each word in the file and writes the result back to disk. - * - * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to - * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. - */ -@SuppressWarnings("serial") -public class WordCount { - - public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: WordCount <input path> <result path>"); - return; - } - - final String inputPath = args[0]; - final String outputPath = args[1]; - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // Set up the Hadoop Input Format - Job job = Job.getInstance(); - HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job); - TextInputFormat.addInputPath(job, new Path(inputPath)); - - // Create a Flink job with it - DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); - - // Tokenize the line and convert from Writable "Text" to String for better handling - DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer()); - - // Sum up the words - DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1); - - // Convert String back to Writable "Text" for use with Hadoop Output Format - DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper()); - - // Set up Hadoop Output Format - HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job); - hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); - hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test - // is being executed with both types (hadoop1 and hadoop2 profile) - TextOutputFormat.setOutputPath(job, new Path(outputPath)); - - // Output & Execute - hadoopResult.output(hadoopOutputFormat); - env.execute("Word Count"); - } - - /** - * Splits a line into words and converts Hadoop Writables into normal Java data types. - */ - public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { - - @Override - public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) { - // normalize and split the line - String line = value.f1.toString(); - String[] tokens = line.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2<String, Integer>(token, 1)); - } - } - } - } - - /** - * Converts Java data types to Hadoop Writables. - */ - public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { - - @Override - public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception { - return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1)); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java index bbb7503..ccc0d82 100644 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.test.hadoopcompatibility.mapred; -import org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount; +import org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java new file mode 100644 index 0000000..ce0143a --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred.example; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + + + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. + */ +public class HadoopMapredCompatWordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount <input path> <result path>"); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf()); + TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath)); + + // Create a Flink job with it + DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); + + DataSet<Tuple2<Text, LongWritable>> words = + text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer())) + .groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter())); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = + new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf()); + hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath)); + + // Output & Execute + words.output(hadoopOutputFormat).setParallelism(1); + env.execute("Hadoop Compat WordCount"); + } + + + public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> { + + @Override + public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) + throws IOException { + // normalize and split the line + String line = v.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Text(token), new LongWritable(1l)); + } + } + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + + } + + public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> { + + @Override + public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep) + throws IOException { + + long cnt = 0; + while(vs.hasNext()) { + cnt += vs.next().get(); + } + out.collect(k, new LongWritable(cnt)); + + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java index 9b4aeea..698e356 100644 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.test.hadoopcompatibility.mapreduce; -import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount; +import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/131f016e/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java new file mode 100644 index 0000000..3de3f72 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapreduce.example; + +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. + */ +@SuppressWarnings("serial") +public class WordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount <input path> <result path>"); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job); + TextInputFormat.addInputPath(job, new Path(inputPath)); + + // Create a Flink job with it + DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); + + // Tokenize the line and convert from Writable "Text" to String for better handling + DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer()); + + // Sum up the words + DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1); + + // Convert String back to Writable "Text" for use with Hadoop Output Format + DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper()); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job); + hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); + hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test + // is being executed with both types (hadoop1 and hadoop2 profile) + TextOutputFormat.setOutputPath(job, new Path(outputPath)); + + // Output & Execute + hadoopResult.output(hadoopOutputFormat); + env.execute("Word Count"); + } + + /** + * Splits a line into words and converts Hadoop Writables into normal Java data types. + */ + public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { + + @Override + public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String line = value.f1.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + + /** + * Converts Java data types to Hadoop Writables. + */ + public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { + + @Override + public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception { + return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1)); + } + + } + +}
