This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 846cac2 [FLINK-17872][doc] Add document for writing Avro files with
StreamingFileSink
846cac2 is described below
commit 846cac212a40efc8238bdd0679b81fb772061258
Author: Yun Gao <[email protected]>
AuthorDate: Tue Jun 9 19:43:41 2020 +0800
[FLINK-17872][doc] Add document for writing Avro files with
StreamingFileSink
This closes #12321
---
docs/dev/connectors/streamfile_sink.md | 101 ++++++++++++++++++++++++++++++
docs/dev/connectors/streamfile_sink.zh.md | 100 +++++++++++++++++++++++++++++
2 files changed, 201 insertions(+)
diff --git a/docs/dev/connectors/streamfile_sink.md
b/docs/dev/connectors/streamfile_sink.md
index 271019a..1cc3631 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -139,6 +139,7 @@ are finalized for further encoding purposes.
Flink comes with four built-in BulkWriter factories:
- [ParquetWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
+ - [AvroWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/avro/AvroWriterFactory.html)
- [SequenceFileWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
- [CompressWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
- [OrcBulkWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
@@ -204,6 +205,106 @@ input.addSink(sink)
</div>
</div>
+#### Avro format
+
+Flink also provides built-in support for writing data into Avro files. A list
of convenience methods to create
+Avro writer factories and their associated documentation can be found in the
+[AvroWriters]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/avro/AvroWriters.html) class.
+
+To use the Avro writers in your application you need to add the following
dependency:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+A StreamingFileSink that writes data to Avro files can be created like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.formats.avro.AvroWriters;
+import org.apache.avro.Schema;
+
+
+Schema schema = ...;
+DataStream<GenericRecord> stream = ...;
+
+final StreamingFileSink<GenericRecord> sink = StreamingFileSink
+ .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+ .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.formats.avro.AvroWriters
+import org.apache.avro.Schema
+
+val schema: Schema = ...
+val input: DataStream[GenericRecord] = ...
+
+val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
+ .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+ .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+For creating customized Avro writers, e.g. enabling compression, users need to
create the `AvroWriterFactory`
+with a custom implementation of the [AvroBuilder]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/avro/AvroBuilder.html) interface:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>)
out -> {
+ Schema schema = ReflectData.get().getSchema(Address.class);
+ DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
+
+ DataFileWriter<Address> dataFileWriter = new
DataFileWriter<>(datumWriter);
+ dataFileWriter.setCodec(CodecFactory.snappyCodec());
+ dataFileWriter.create(schema, out);
+ return dataFileWriter;
+});
+
+DataStream<Address> stream = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+ outputBasePath,
+ factory).build());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
+ override def createWriter(out: OutputStream): DataFileWriter[Address] = {
+ val schema = ReflectData.get.getSchema(classOf[Address])
+ val datumWriter = new ReflectDatumWriter[Address](schema)
+
+ val dataFileWriter = new DataFileWriter[Address](datumWriter)
+ dataFileWriter.setCodec(CodecFactory.snappyCodec)
+ dataFileWriter.create(schema, out)
+ dataFileWriter
+ }
+})
+
+val stream: DataStream[Address] = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+ outputBasePath,
+ factory).build());
+{% endhighlight %}
+</div>
+</div>
+
#### ORC Format
To enable the data to be bulk encoded in ORC format, Flink offers
[OrcBulkWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html)
diff --git a/docs/dev/connectors/streamfile_sink.zh.md
b/docs/dev/connectors/streamfile_sink.zh.md
index 9f027a7..176092e 100644
--- a/docs/dev/connectors/streamfile_sink.zh.md
+++ b/docs/dev/connectors/streamfile_sink.zh.md
@@ -125,6 +125,7 @@ input.addSink(sink)
Flink 有四个内置的 BulkWriter Factory :
- [ParquetWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
+ - [AvroWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/avro/AvroWriterFactory.html)
- [SequenceFileWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
- [CompressWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
- [OrcBulkWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
@@ -189,6 +190,105 @@ input.addSink(sink)
</div>
</div>
+#### Avro格式
+
+Flink 也提供了将数据写入 Avro 文件的内置支持。对于创建 AvroWriterFactory 的快捷方法,更多信息可以参考
+[AvroWriters]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/avro/AvroWriters.html).
+
+使用Avro相关的Writer需要在项目中添加以下依赖:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+将数据写入 Avro 文件的 StreamingFileSink 算子可以通过如下方式创建:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.formats.avro.AvroWriters;
+import org.apache.avro.Schema;
+
+
+Schema schema = ...;
+DataStream<GenericRecord> stream = ...;
+
+final StreamingFileSink<GenericRecord> sink = StreamingFileSink
+ .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+ .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.formats.avro.AvroWriters
+import org.apache.avro.Schema
+
+val schema: Schema = ...
+val input: DataStream[GenericRecord] = ...
+
+val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
+ .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+ .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+如果想要创建自定义的 Avro Writer,例如启用压缩等,用户可以实现 [AvroBuilder]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/avro/AvroBuilder.html)
+接口并自行创建一个 `AvroWriterFactory` 实例:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>)
out -> {
+ Schema schema = ReflectData.get().getSchema(Address.class);
+ DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
+
+ DataFileWriter<Address> dataFileWriter = new
DataFileWriter<>(datumWriter);
+ dataFileWriter.setCodec(CodecFactory.snappyCodec());
+ dataFileWriter.create(schema, out);
+ return dataFileWriter;
+});
+
+DataStream<Address> stream = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+ outputBasePath,
+ factory).build());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
+ override def createWriter(out: OutputStream): DataFileWriter[Address] = {
+ val schema = ReflectData.get.getSchema(classOf[Address])
+ val datumWriter = new ReflectDatumWriter[Address](schema)
+
+ val dataFileWriter = new DataFileWriter[Address](datumWriter)
+ dataFileWriter.setCodec(CodecFactory.snappyCodec)
+ dataFileWriter.create(schema, out)
+ dataFileWriter
+ }
+})
+
+val stream: DataStream[Address] = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+ outputBasePath,
+ factory).build());
+{% endhighlight %}
+</div>
+</div>
+
#### ORC Format
To enable the data to be bulk encoded in ORC format, Flink offers
[OrcBulkWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html)