This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 846cac2  [FLINK-17872][doc] Add document for writing Avro files with 
StreamingFileSink
846cac2 is described below

commit 846cac212a40efc8238bdd0679b81fb772061258
Author: Yun Gao <[email protected]>
AuthorDate: Tue Jun 9 19:43:41 2020 +0800

    [FLINK-17872][doc] Add document for writing Avro files with 
StreamingFileSink
    
    
    This closes #12321
---
 docs/dev/connectors/streamfile_sink.md    | 101 ++++++++++++++++++++++++++++++
 docs/dev/connectors/streamfile_sink.zh.md | 100 +++++++++++++++++++++++++++++
 2 files changed, 201 insertions(+)

diff --git a/docs/dev/connectors/streamfile_sink.md 
b/docs/dev/connectors/streamfile_sink.md
index 271019a..1cc3631 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -139,6 +139,7 @@ are finalized for further encoding purposes.
 Flink comes with four built-in BulkWriter factories:
 
  - [ParquetWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
+ - [AvroWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/avro/AvroWriterFactory.html)
  - [SequenceFileWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
  - [CompressWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
  - [OrcBulkWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
@@ -204,6 +205,106 @@ input.addSink(sink)
 </div>
 </div>
 
+#### Avro format
+
+Flink also provides built-in support for writing data into Avro files. A list 
of convenience methods to create
+Avro writer factories and their associated documentation can be found in the 
+[AvroWriters]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/avro/AvroWriters.html) class.
+
+To use the Avro writers in your application you need to add the following 
dependency:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-avro</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+A StreamingFileSink that writes data to Avro files can be created like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.formats.avro.AvroWriters;
+import org.apache.avro.Schema;
+
+
+Schema schema = ...;
+DataStream<GenericRecord> stream = ...;
+
+final StreamingFileSink<GenericRecord> sink = StreamingFileSink
+       .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+       .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.formats.avro.AvroWriters
+import org.apache.avro.Schema
+
+val schema: Schema = ...
+val input: DataStream[GenericRecord] = ...
+
+val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
+    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+    .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+For creating customized Avro writers, e.g. enabling compression, users need to 
create the `AvroWriterFactory`
+with a custom implementation of the [AvroBuilder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/avro/AvroBuilder.html) interface:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) 
out -> {
+       Schema schema = ReflectData.get().getSchema(Address.class);
+       DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
+
+       DataFileWriter<Address> dataFileWriter = new 
DataFileWriter<>(datumWriter);
+       dataFileWriter.setCodec(CodecFactory.snappyCodec());
+       dataFileWriter.create(schema, out);
+       return dataFileWriter;
+});
+
+DataStream<Address> stream = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+       outputBasePath,
+       factory).build());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
+    override def createWriter(out: OutputStream): DataFileWriter[Address] = {
+        val schema = ReflectData.get.getSchema(classOf[Address])
+        val datumWriter = new ReflectDatumWriter[Address](schema)
+
+        val dataFileWriter = new DataFileWriter[Address](datumWriter)
+        dataFileWriter.setCodec(CodecFactory.snappyCodec)
+        dataFileWriter.create(schema, out)
+        dataFileWriter
+    }
+})
+
+val stream: DataStream[Address] = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+    outputBasePath,
+    factory).build());
+{% endhighlight %}
+</div>
+</div>
+
 #### ORC Format
  
 To enable the data to be bulk encoded in ORC format, Flink offers 
[OrcBulkWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html) 
diff --git a/docs/dev/connectors/streamfile_sink.zh.md 
b/docs/dev/connectors/streamfile_sink.zh.md
index 9f027a7..176092e 100644
--- a/docs/dev/connectors/streamfile_sink.zh.md
+++ b/docs/dev/connectors/streamfile_sink.zh.md
@@ -125,6 +125,7 @@ input.addSink(sink)
 Flink 有四个内置的 BulkWriter Factory :
 
  - [ParquetWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
+ - [AvroWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/avro/AvroWriterFactory.html)
  - [SequenceFileWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
  - [CompressWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
  - [OrcBulkWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
@@ -189,6 +190,105 @@ input.addSink(sink)
 </div>
 </div>
 
+#### Avro格式
+
+Flink 也提供了将数据写入 Avro 文件的内置支持。对于创建 AvroWriterFactory 的快捷方法,更多信息可以参考 
+[AvroWriters]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/avro/AvroWriters.html).
+
+使用Avro相关的Writer需要在项目中添加以下依赖:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-avro</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+将数据写入 Avro 文件的 StreamingFileSink 算子可以通过如下方式创建:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.formats.avro.AvroWriters;
+import org.apache.avro.Schema;
+
+
+Schema schema = ...;
+DataStream<GenericRecord> stream = ...;
+
+final StreamingFileSink<GenericRecord> sink = StreamingFileSink
+       .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+       .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.formats.avro.AvroWriters
+import org.apache.avro.Schema
+
+val schema: Schema = ...
+val input: DataStream[GenericRecord] = ...
+
+val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
+    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
+    .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+如果想要创建自定义的 Avro Writer,例如启用压缩等,用户可以实现 [AvroBuilder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/avro/AvroBuilder.html)
+接口并自行创建一个 `AvroWriterFactory` 实例:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) 
out -> {
+       Schema schema = ReflectData.get().getSchema(Address.class);
+       DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
+
+       DataFileWriter<Address> dataFileWriter = new 
DataFileWriter<>(datumWriter);
+       dataFileWriter.setCodec(CodecFactory.snappyCodec());
+       dataFileWriter.create(schema, out);
+       return dataFileWriter;
+});
+
+DataStream<Address> stream = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+       outputBasePath,
+       factory).build());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
+    override def createWriter(out: OutputStream): DataFileWriter[Address] = {
+        val schema = ReflectData.get.getSchema(classOf[Address])
+        val datumWriter = new ReflectDatumWriter[Address](schema)
+
+        val dataFileWriter = new DataFileWriter[Address](datumWriter)
+        dataFileWriter.setCodec(CodecFactory.snappyCodec)
+        dataFileWriter.create(schema, out)
+        dataFileWriter
+    }
+})
+
+val stream: DataStream[Address] = ...
+stream.addSink(StreamingFileSink.forBulkFormat(
+    outputBasePath,
+    factory).build());
+{% endhighlight %}
+</div>
+</div>
+
 #### ORC Format
 
 To enable the data to be bulk encoded in ORC format, Flink offers 
[OrcBulkWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html)

Reply via email to