This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cbedff73193c90f2324951273baa71cdfd8c23f1 Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Nov 19 12:02:05 2021 +0100 [FLINK-21407][doc][formats] Split DataSet connectors page into different formats and create a formats sub-folder like in table api doc --- docs/content/docs/connectors/dataset/_index.md | 23 +++ .../docs/connectors/dataset/formats/_index.md | 23 +++ .../docs/connectors/dataset/formats/avro.md | 61 ++++++++ .../formats/azure_table_storage.md} | 96 +++---------- .../docs/connectors/dataset/formats/hadoop.md | 158 +++++++++++++++++++++ 5 files changed, 283 insertions(+), 78 deletions(-) diff --git a/docs/content/docs/connectors/dataset/_index.md b/docs/content/docs/connectors/dataset/_index.md new file mode 100644 index 0000000..3b0d8f6 --- /dev/null +++ b/docs/content/docs/connectors/dataset/_index.md @@ -0,0 +1,23 @@ +--- +title: DataSet Connectors +bookCollapseSection: true +weight: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> diff --git a/docs/content/docs/connectors/dataset/formats/_index.md b/docs/content/docs/connectors/dataset/formats/_index.md new file mode 100644 index 0000000..282fc69 --- /dev/null +++ b/docs/content/docs/connectors/dataset/formats/_index.md @@ -0,0 +1,23 @@ +--- +title: Formats +bookCollapseSection: true +weight: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> diff --git a/docs/content/docs/connectors/dataset/formats/avro.md b/docs/content/docs/connectors/dataset/formats/avro.md new file mode 100644 index 0000000..7320587 --- /dev/null +++ b/docs/content/docs/connectors/dataset/formats/avro.md @@ -0,0 +1,61 @@ +--- +title: "Avro" +weight: 4 +type: docs +aliases: +- /dev/batch/connectors/formats/avro.html + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +# Avro format + +Flink has built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read and write Avro data based on an Avro schema with Flink. +The serialization framework of Flink is able to handle classes generated from Avro schemas. In order to use the Avro format the following dependencies are required for projects using a build automation tool (such as Maven or SBT). + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>{{< version >}}</version> +</dependency> +``` + +In order to read data from an Avro file, you have to specify an `AvroInputFormat`. + +**Example**: + +```java +AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); +DataSet<User> usersDS = env.createInput(users); +``` + +Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example: + +```java +usersDS.keyBy("name") +``` + + +Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. + +Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key. +Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible! diff --git a/docs/content/docs/connectors/dataset.md b/docs/content/docs/connectors/dataset/formats/azure_table_storage.md similarity index 53% rename from docs/content/docs/connectors/dataset.md rename to docs/content/docs/connectors/dataset/formats/azure_table_storage.md index a4ddca3..4c45bfc 100644 --- a/docs/content/docs/connectors/dataset.md +++ b/docs/content/docs/connectors/dataset/formats/azure_table_storage.md @@ -1,9 +1,10 @@ --- -title: "DataSet Connectors" -weight: 3 +title: "Microsoft Azure table" +weight: 4 type: docs aliases: - - /dev/batch/connectors.html +- /dev/batch/connectors/formats/azure_table_storage.html + --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -24,69 +25,12 @@ specific language governing permissions and limitations under the License. --> -# DataSet Connectors - -## Reading from and writing to file systems - -The Apache Flink project supports multiple [file systems]({{< ref "docs/deployment/filesystems/overview" >}}) that can be used as backing stores -for input and output connectors. - -## Connecting to other systems using Input/OutputFormat wrappers for Hadoop - -Apache Flink allows users to access many different systems as data sources or sinks. -The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept -of so called `InputFormat`s and `OutputFormat`s. - -One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows -users to use all existing Hadoop input formats with Flink. - -This section shows some examples for connecting Flink to other systems. -[Read more about Hadoop compatibility in Flink]({{< ref "docs/dev/dataset/hadoop_compatibility" >}}). - -## Avro support in Flink - -Flink has extensive built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink. -Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. Be sure to include the Flink Avro dependency to the pom.xml of your project. - -```xml -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro</artifactId> - <version>{{< version >}}</version> -</dependency> -``` - -In order to read data from an Avro file, you have to specify an `AvroInputFormat`. - -**Example**: - -```java -AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); -DataSet<User> usersDS = env.createInput(users); -``` - -Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example: - -```java -usersDS.groupBy("name") -``` - - -Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. - -Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key. -Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible! - - - -### Access Microsoft Azure Table Storage - -_Note: This example works starting from Flink 0.6-incubating_ +# Microsoft Azure Table Storage format This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/). 1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves. -Execute the following commands: + Execute the following commands: ```bash git clone https://github.com/mooso/azure-tables-hadoop.git @@ -104,29 +48,29 @@ curl https://flink.apache.org/q/quickstart.sh | bash ```xml <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId> - <version>{{< version >}}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId> + <version>{{< version >}}</version> </dependency> <dependency> - <groupId>com.microsoft.hadoop</groupId> - <artifactId>microsoft-hadoop-azure</artifactId> - <version>0.0.4</version> + <groupId>com.microsoft.hadoop</groupId> + <artifactId>microsoft-hadoop-azure</artifactId> + <version>0.0.5</version> </dependency> ``` `flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers. `microsoft-hadoop-azure` is adding the project we've build before to our project. -The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!). -Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job. +The project is now ready for starting to code. We recommend to import the project into an IDE, such as IntelliJ. You should import it as a Maven project. +Browse to the file `Job.java`. This is an empty skeleton for a Flink job. -Paste the following code into it: +Paste the following code: ```java import java.util.Map; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.DataStream; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; @@ -147,7 +91,7 @@ public class AzureTableExample { HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job()); // set the Account URI, something like: https://apacheflink.table.core.windows.net - hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO"); + hdIf.getConfiguration().set(azuretableconfiguration.Keys.ACCOUNT_URI.getKey(), "TODO"); // set the secret storage key here hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO"); // set the table name here @@ -178,10 +122,6 @@ public class AzureTableExample { } ``` -The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet. - -## Access MongoDB - -This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test). +The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataStream. {{< top >}} diff --git a/docs/content/docs/connectors/dataset/formats/hadoop.md b/docs/content/docs/connectors/dataset/formats/hadoop.md new file mode 100644 index 0000000..64702cb --- /dev/null +++ b/docs/content/docs/connectors/dataset/formats/hadoop.md @@ -0,0 +1,158 @@ +--- +title: "Hadoop" +weight: 4 +type: docs +aliases: +- /dev/batch/connectors/hadoop.html + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Hadoop formats + +## Project Configuration + +Support for Hadoop is contained in the `flink-hadoop-compatibility` +Maven module. + +Add the following dependency to your `pom.xml` to use hadoop + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId> + <version>{{< version >}}</version> +</dependency> +``` + +If you want to run your Flink application locally (e.g. from your IDE), you also need to add +a `hadoop-client` dependency such as: + +```xml +<dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.8.3</version> + <scope>provided</scope> +</dependency> +``` + +## Using Hadoop InputFormats + +To use Hadoop `InputFormats` with Flink the format must first be wrapped +using either `readHadoopFile` or `createHadoopInput` of the +`HadoopInputs` utility class. +The former is used for input formats derived +from `FileInputFormat` while the latter has to be used for general purpose +input formats. +The resulting `InputFormat` can be used to create a data source by using +`ExecutionEnvironmen#createInput`. + +The resulting `DataSet` contains 2-tuples where the first field +is the key and the second field is the value retrieved from the Hadoop +InputFormat. + +The following example shows how to use Hadoop's `TextInputFormat`. + +{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}} +{{< tab "Java" >}} + +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +DataSet<Tuple2<LongWritable, Text>> input = + env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath)); + +// Do something with the data. +[...] +``` + +{{< /tab >}} +{{< tab "Scala" >}} + +```scala +val env = ExecutionEnvironment.getExecutionEnvironment + +val input: DataSet[(LongWritable, Text)] = + env.createInput(HadoopInputs.readHadoopFile( + new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) + +// Do something with the data. +[...] +``` + +{{< /tab >}} +{{< /tabs >}} + +## Using Hadoop OutputFormats + +Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class +that implements `org.apache.hadoop.mapred.OutputFormat` or extends +`org.apache.hadoop.mapreduce.OutputFormat` is supported. +The OutputFormat wrapper expects its input data to be a DataSet containing +2-tuples of key and value. These are to be processed by the Hadoop OutputFormat. + +The following example shows how to use Hadoop's `TextOutputFormat`. + +{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}} +{{< tab "Java" >}} + +```java +// Obtain the result we want to emit +DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...] + +// Set up the Hadoop TextOutputFormat. +HadoopOutputFormat<Text, IntWritable> hadoopOF = + // create the Flink wrapper. + new HadoopOutputFormat<Text, IntWritable>( + // set the Hadoop OutputFormat and specify the job. + new TextOutputFormat<Text, IntWritable>(), job + ); +hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); +TextOutputFormat.setOutputPath(job, new Path(outputPath)); + +// Emit data using the Hadoop TextOutputFormat. +hadoopResult.output(hadoopOF); +``` + +{{< /tab >}} +{{< tab "Scala" >}} + +```scala +// Obtain your result to emit. +val hadoopResult: DataSet[(Text, IntWritable)] = [...] + +val hadoopOF = new HadoopOutputFormat[Text,IntWritable]( + new TextOutputFormat[Text, IntWritable], + new JobConf) + +hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ") +FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath)) + +hadoopResult.output(hadoopOF) + + +``` + +{{< /tab >}} +{{< /tabs >}} + +{{< top >}}
