This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4913bc83383ee4ff33c99a9fc27a34b2eda8803d Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Nov 5 14:37:02 2021 +0100 [FLINK-21407][doc][formats] Move haddop input and output formats to hadoop.md formats page --- .../docs/connectors/datastream/formats/hadoop.md | 184 ++++++++++++++++++++- ...adoop_compatibility.md => hadoop_map_reduce.md} | 117 +------------ 2 files changed, 187 insertions(+), 114 deletions(-) diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md index c6159cc..0756f02 100644 --- a/docs/content/docs/connectors/datastream/formats/hadoop.md +++ b/docs/content/docs/connectors/datastream/formats/hadoop.md @@ -28,11 +28,185 @@ under the License. # Hadoop formats -Apache Flink allows users to access many different systems as data sources. -The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept -of so called `InputFormat`s +## Project Configuration -One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows -users to use all existing Hadoop input formats with Flink. +Support for Hadoop input/output formats is part of the `flink-java` and +`flink-scala` Maven modules that are always required when writing Flink jobs. +The code is located in `org.apache.flink.api.java.hadoop` and +`org.apache.flink.api.scala.hadoop` in an additional sub-package for the +`mapred` and `mapreduce` API. + +Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility` +Maven module. +This code resides in the `org.apache.flink.hadoopcompatibility` +package. + +Add the following dependency to your `pom.xml` if you want to reuse Mappers +and Reducers. + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId> + <version>{{< version >}}</version> +</dependency> +``` + +If you want to run your Flink application locally (e.g. from your IDE), you also need to add +a `hadoop-client` dependency such as: + +```xml +<dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.8.3</version> + <scope>provided</scope> +</dependency> +``` + +## Using Hadoop InputFormats + +To use Hadoop `InputFormats` with Flink the format must first be wrapped +using either `readHadoopFile` or `createHadoopInput` of the +`HadoopInputs` utility class. +The former is used for input formats derived +from `FileInputFormat` while the latter has to be used for general purpose +input formats. +The resulting `InputFormat` can be used to create a data source by using +`ExecutionEnvironmen#createInput`. + +The resulting `DataSet` contains 2-tuples where the first field +is the key and the second field is the value retrieved from the Hadoop +InputFormat. + +The following example shows how to use Hadoop's `TextInputFormat`. + +{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}} +{{< tab "Java" >}} + +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +DataSet<Tuple2<LongWritable, Text>> input = + env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath)); + +// Do something with the data. +[...] +``` + +{{< /tab >}} +{{< tab "Scala" >}} + +```scala +val env = ExecutionEnvironment.getExecutionEnvironment + +val input: DataSet[(LongWritable, Text)] = + env.createInput(HadoopInputs.readHadoopFile( + new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) + +// Do something with the data. +[...] +``` + +{{< /tab >}} +{{< /tabs >}} + +## Using Hadoop OutputFormats + +Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class +that implements `org.apache.hadoop.mapred.OutputFormat` or extends +`org.apache.hadoop.mapreduce.OutputFormat` is supported. +The OutputFormat wrapper expects its input data to be a DataSet containing +2-tuples of key and value. These are to be processed by the Hadoop OutputFormat. + +The following example shows how to use Hadoop's `TextOutputFormat`. + +{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}} +{{< tab "Java" >}} + +```java +// Obtain the result we want to emit +DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...] + +// Set up the Hadoop TextOutputFormat. +HadoopOutputFormat<Text, IntWritable> hadoopOF = + // create the Flink wrapper. + new HadoopOutputFormat<Text, IntWritable>( + // set the Hadoop OutputFormat and specify the job. + new TextOutputFormat<Text, IntWritable>(), job + ); +hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); +TextOutputFormat.setOutputPath(job, new Path(outputPath)); + +// Emit data using the Hadoop TextOutputFormat. +hadoopResult.output(hadoopOF); +``` + +{{< /tab >}} +{{< tab "Scala" >}} + +```scala +// Obtain your result to emit. +val hadoopResult: DataSet[(Text, IntWritable)] = [...] + +val hadoopOF = new HadoopOutputFormat[Text,IntWritable]( + new TextOutputFormat[Text, IntWritable], + new JobConf) + +hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ") +FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath)) + +hadoopResult.output(hadoopOF) + + +``` + +{{< /tab >}} +{{< /tabs >}} + +## Complete Hadoop WordCount Example + +The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations. + +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +// Set up the Hadoop TextInputFormat. +Job job = Job.getInstance(); +HadoopInputFormat<LongWritable, Text> hadoopIF = + new HadoopInputFormat<LongWritable, Text>( + new TextInputFormat(), LongWritable.class, Text.class, job + ); +TextInputFormat.addInputPath(job, new Path(inputPath)); + +// Read data using the Hadoop TextInputFormat. +DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF); + +DataSet<Tuple2<Text, LongWritable>> result = text + // use Hadoop Mapper (Tokenizer) as MapFunction + .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>( + new Tokenizer() + )) + .groupBy(0) + // use Hadoop Reducer (Counter) as Reduce- and CombineFunction + .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>( + new Counter(), new Counter() + )); + +// Set up the Hadoop TextOutputFormat. +HadoopOutputFormat<Text, LongWritable> hadoopOF = + new HadoopOutputFormat<Text, LongWritable>( + new TextOutputFormat<Text, LongWritable>(), job + ); +hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); +TextOutputFormat.setOutputPath(job, new Path(outputPath)); + +// Emit data using the Hadoop TextOutputFormat. +result.output(hadoopOF); + +// Execute Program +env.execute("Hadoop WordCount"); +``` {{< top >}} diff --git a/docs/content/docs/dev/dataset/hadoop_compatibility.md b/docs/content/docs/dev/dataset/hadoop_map_reduce.md similarity index 67% rename from docs/content/docs/dev/dataset/hadoop_compatibility.md rename to docs/content/docs/dev/dataset/hadoop_map_reduce.md index acbaf6b..c75c3c5 100644 --- a/docs/content/docs/dev/dataset/hadoop_compatibility.md +++ b/docs/content/docs/dev/dataset/hadoop_map_reduce.md @@ -1,9 +1,9 @@ --- -title: "Hadoop Compatibility" +title: "Hadoop MapReduce compatibility with Flink" weight: 8 type: docs aliases: - - /dev/batch/hadoop_compatibility.html + - /dev/batch/hadoop_map_reduce.html --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -24,7 +24,7 @@ specific language governing permissions and limitations under the License. --> -# Hadoop Compatibility +# Flink and Map Reduce compatibility Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows reusing code that was implemented for Hadoop MapReduce. @@ -32,15 +32,15 @@ reusing code that was implemented for Hadoop MapReduce. You can: - use Hadoop's `Writable` [data types]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}#supported-data-types) in Flink programs. -- use any Hadoop `InputFormat` as a [DataSource]({{ ref "docs/dev/dataset/overview" >}}#data-sources). -- use any Hadoop `OutputFormat` as a [DataSink]({{ ref "docs/dev/dataset/overview" >}}#data-sinks). +- use any Hadoop `InputFormat` as a [DataSource]({{ ref "docs/dev/connectors/formats/hadoop.html" >}}#data-sources). +- use any Hadoop `OutputFormat` as a [DataSink]({{ ref "docs/dev/connectors/formats/hadoop.html" >}}#data-sinks). - use a Hadoop `Mapper` as [FlatMapFunction]({{ ref "docs/dev/dataset/transformations" >}}#flatmap). - use a Hadoop `Reducer` as [GroupReduceFunction]({{ ref "docs/dev/dataset/transformations" >}}#groupreduce-on-grouped-dataset). This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the [Connecting to other systems]({{< ref "docs/deployment/filesystems/overview" >}}#hadoop-file-system-hdfs-and-its-other-implementations) guide for reading from Hadoop supported file systems. -### Project Configuration +## Project Configuration Support for Hadoop input/output formats is part of the `flink-java` and `flink-scala` Maven modules that are always required when writing Flink jobs. @@ -76,108 +76,7 @@ a `hadoop-client` dependency such as: </dependency> ``` -### Using Hadoop InputFormats - -To use Hadoop `InputFormats` with Flink the format must first be wrapped -using either `readHadoopFile` or `createHadoopInput` of the -`HadoopInputs` utility class. -The former is used for input formats derived -from `FileInputFormat` while the latter has to be used for general purpose -input formats. -The resulting `InputFormat` can be used to create a data source by using -`ExecutionEnvironmen#createInput`. - -The resulting `DataSet` contains 2-tuples where the first field -is the key and the second field is the value retrieved from the Hadoop -InputFormat. - -The following example shows how to use Hadoop's `TextInputFormat`. - -{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}} -{{< tab "Java" >}} - -```java -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - -DataSet<Tuple2<LongWritable, Text>> input = - env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(), - LongWritable.class, Text.class, textPath)); - -// Do something with the data. -[...] -``` - -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -val env = ExecutionEnvironment.getExecutionEnvironment - -val input: DataSet[(LongWritable, Text)] = - env.createInput(HadoopInputs.readHadoopFile( - new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) - -// Do something with the data. -[...] -``` - -{{< /tab >}} -{{< /tabs >}} - -### Using Hadoop OutputFormats - -Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class -that implements `org.apache.hadoop.mapred.OutputFormat` or extends -`org.apache.hadoop.mapreduce.OutputFormat` is supported. -The OutputFormat wrapper expects its input data to be a DataSet containing -2-tuples of key and value. These are to be processed by the Hadoop OutputFormat. - -The following example shows how to use Hadoop's `TextOutputFormat`. - -{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}} -{{< tab "Java" >}} - -```java -// Obtain the result we want to emit -DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...] - -// Set up the Hadoop TextOutputFormat. -HadoopOutputFormat<Text, IntWritable> hadoopOF = - // create the Flink wrapper. - new HadoopOutputFormat<Text, IntWritable>( - // set the Hadoop OutputFormat and specify the job. - new TextOutputFormat<Text, IntWritable>(), job - ); -hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); -TextOutputFormat.setOutputPath(job, new Path(outputPath)); - -// Emit data using the Hadoop TextOutputFormat. -hadoopResult.output(hadoopOF); -``` - -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -// Obtain your result to emit. -val hadoopResult: DataSet[(Text, IntWritable)] = [...] - -val hadoopOF = new HadoopOutputFormat[Text,IntWritable]( - new TextOutputFormat[Text, IntWritable], - new JobConf) - -hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ") -FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath)) - -hadoopResult.output(hadoopOF) - - -``` - -{{< /tab >}} -{{< /tabs >}} - -### Using Hadoop Mappers and Reducers +## Using Hadoop Mappers and Reducers Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapr [...] @@ -211,7 +110,7 @@ DataSet<Tuple2<Text, LongWritable>> result = text **Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`. -### Complete Hadoop WordCount Example +## Complete Hadoop WordCount Example The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
