aljoscha commented on a change in pull request #14061: URL: https://github.com/apache/flink/pull/14061#discussion_r522913331
########## File path: docs/dev/connectors/file_sink.md ########## @@ -0,0 +1,808 @@ +--- +title: "File Sink" +nav-title: File Sink +nav-parent_id: connectors +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems +supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems/index.html). It provides the +same guarantees for both `BATCH` and `STREAMING` execution by exposing the same API while having different runtime Review comment: It's not the `FileSink` that does this. The Flink runtime is responsible for this. ########## File path: docs/dev/connectors/file_sink.md ########## @@ -0,0 +1,808 @@ +--- +title: "File Sink" +nav-title: File Sink +nav-parent_id: connectors +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems +supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems/index.html). It provides the +same guarantees for both `BATCH` and `STREAMING` execution by exposing the same API while having different runtime +implementations for each mode. + +The file sink writes incoming data into buckets. Given that the incoming streams can be unbounded, +data in each bucket is organized into part files of finite size. The bucketing behaviour is fully configurable +with a default time-based bucketing where we start writing a new bucket every hour. This means that each resulting +bucket will contain files with records received during 1 hour intervals from the stream. + +Data within the bucket directories is split into part files. Each bucket will contain at least one part file for +each subtask of the sink that has received data for that bucket. Additional part files will be created according to the configurable +rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) the default policy rolls part files based +on size, a timeout that specifies the maximum duration for which a file can be open, and a maximum inactivity +timeout after which the file is closed. For `Bulk-encoded Formats` we roll on every checkpoint and the user can +specify additional conditions based on size or time. + + <div class="alert alert-info"> + <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the `FileSink` in `STREAMING` mode. Part files + can only be finalized on successful checkpoints. If checkpointing is disabled part files will forever stay + in `in-progress` or `pending` state and cannot be safely read by downstream systems. + </div> + + <img src="{{ site.baseurl }}/fig/streamfilesink_bucketing.png" class="center" style="width: 100%;" /> + +## File Formats + +The `FileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http://parquet.apache.org). +These two variants come with their respective builders that can be created with the following static methods: + + - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)` + - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)` + +When creating either a row or a bulk encoded sink we have to specify the base path where the buckets will be +stored and the encoding logic for our data. + +Please check out the JavaDoc for [FileSink]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/file/sink/FileSink.html) +for all the configuration options and more documentation about the implementation of the different data formats. + +### Row-encoded Formats + +Row-encoded formats need to specify an [Encoder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/Encoder.html) +that is used for serializing individual rows to the `OutputStream` of the in-progress part files. + +In addition to the bucket assigner the [RowFormatBuilder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/file/sink/FileSink.RowFormatBuilder.html) allows the user to specify: + + - Custom [RollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html) : Rolling policy to override the DefaultRollingPolicy + - bucketCheckInterval (default = 1 min) : Millisecond interval for checking time based rolling policies + +Basic usage for writing String elements thus looks like this: + + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; + +DataStream<String> input = ...; + +final FileSink<String> sink = FileSink + .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) + .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) + .withMaxPartSize(1024 * 1024 * 1024) + .build()) + .build(); + +input.sinkTo(sink); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.api.common.serialization.SimpleStringEncoder +import org.apache.flink.core.fs.Path +import org.apache.flink.connector.file.sink.FileSink +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy + +val input: DataStream[String] = ... + +val sink: FileSink[String] = FileSink + .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) + .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) + .withMaxPartSize(1024 * 1024 * 1024) + .build()) + .build() + +input.sinkTo(sink) + +{% endhighlight %} +</div> +</div> + +This example creates a simple sink that assigns records to the default one hour time buckets. It also specifies +a rolling policy that rolls the in-progress part file on either of the following 3 conditions: + + - It contains at least 15 minutes worth of data + - It hasn't received new records for the last 5 minutes + - The file size reached 1 GB (after writing the last record) + +### Bulk-encoded Formats + +Bulk-encoded sinks are created similarly to the row-encoded ones but here instead of +specifying an `Encoder` we have to specify [BulkWriter.Factory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html). +The `BulkWriter` logic defines how new elements added, flushed and how the bulk of records +are finalized for further encoding purposes. + +Flink comes with four built-in BulkWriter factories: + + - [ParquetWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html) + - [AvroWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/avro/AvroWriterFactory.html) + - [SequenceFileWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html) + - [CompressWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html) + - [OrcBulkWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html) + +<div class="alert alert-info"> + <b>IMPORTANT:</b> Bulk Formats can only have a rolling policy that extends the `CheckpointRollingPolicy`. + The latter rolls on every checkpoint. In addition to that, a policy can roll (additionally) based on size Review comment: "in addition to that" and "additionally" ########## File path: docs/dev/connectors/file_sink.md ########## @@ -0,0 +1,808 @@ +--- +title: "File Sink" +nav-title: File Sink +nav-parent_id: connectors +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems +supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems/index.html). It provides the +same guarantees for both `BATCH` and `STREAMING` execution by exposing the same API while having different runtime +implementations for each mode. + +The file sink writes incoming data into buckets. Given that the incoming streams can be unbounded, +data in each bucket is organized into part files of finite size. The bucketing behaviour is fully configurable +with a default time-based bucketing where we start writing a new bucket every hour. This means that each resulting +bucket will contain files with records received during 1 hour intervals from the stream. + +Data within the bucket directories is split into part files. Each bucket will contain at least one part file for +each subtask of the sink that has received data for that bucket. Additional part files will be created according to the configurable +rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) the default policy rolls part files based +on size, a timeout that specifies the maximum duration for which a file can be open, and a maximum inactivity +timeout after which the file is closed. For `Bulk-encoded Formats` we roll on every checkpoint and the user can +specify additional conditions based on size or time. + + <div class="alert alert-info"> + <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the `FileSink` in `STREAMING` mode. Part files + can only be finalized on successful checkpoints. If checkpointing is disabled part files will forever stay + in `in-progress` or `pending` state and cannot be safely read by downstream systems. + </div> + + <img src="{{ site.baseurl }}/fig/streamfilesink_bucketing.png" class="center" style="width: 100%;" /> + +## File Formats + +The `FileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http://parquet.apache.org). +These two variants come with their respective builders that can be created with the following static methods: + + - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)` + - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)` + +When creating either a row or a bulk encoded sink we have to specify the base path where the buckets will be +stored and the encoding logic for our data. + +Please check out the JavaDoc for [FileSink]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/file/sink/FileSink.html) +for all the configuration options and more documentation about the implementation of the different data formats. + +### Row-encoded Formats + +Row-encoded formats need to specify an [Encoder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/Encoder.html) +that is used for serializing individual rows to the `OutputStream` of the in-progress part files. + +In addition to the bucket assigner the [RowFormatBuilder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/file/sink/FileSink.RowFormatBuilder.html) allows the user to specify: + + - Custom [RollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html) : Rolling policy to override the DefaultRollingPolicy + - bucketCheckInterval (default = 1 min) : Millisecond interval for checking time based rolling policies + +Basic usage for writing String elements thus looks like this: + + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; + +DataStream<String> input = ...; + +final FileSink<String> sink = FileSink + .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) + .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) + .withMaxPartSize(1024 * 1024 * 1024) + .build()) + .build(); + +input.sinkTo(sink); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.api.common.serialization.SimpleStringEncoder +import org.apache.flink.core.fs.Path +import org.apache.flink.connector.file.sink.FileSink +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy + +val input: DataStream[String] = ... + +val sink: FileSink[String] = FileSink + .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) + .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) + .withMaxPartSize(1024 * 1024 * 1024) + .build()) + .build() + +input.sinkTo(sink) + +{% endhighlight %} +</div> +</div> + +This example creates a simple sink that assigns records to the default one hour time buckets. It also specifies +a rolling policy that rolls the in-progress part file on either of the following 3 conditions: + + - It contains at least 15 minutes worth of data + - It hasn't received new records for the last 5 minutes + - The file size reached 1 GB (after writing the last record) + +### Bulk-encoded Formats + +Bulk-encoded sinks are created similarly to the row-encoded ones but here instead of +specifying an `Encoder` we have to specify [BulkWriter.Factory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html). +The `BulkWriter` logic defines how new elements added, flushed and how the bulk of records +are finalized for further encoding purposes. + +Flink comes with four built-in BulkWriter factories: + + - [ParquetWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html) + - [AvroWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/avro/AvroWriterFactory.html) + - [SequenceFileWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html) + - [CompressWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html) + - [OrcBulkWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html) + +<div class="alert alert-info"> + <b>IMPORTANT:</b> Bulk Formats can only have a rolling policy that extends the `CheckpointRollingPolicy`. + The latter rolls on every checkpoint. In addition to that, a policy can roll (additionally) based on size + or processing time. +</div> + +#### Parquet format + +Flink contains built in convenience methods for creating Parquet writer factories for Avro data. These methods +and their associated documentation can be found in the [ParquetAvroWriters]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.html) class. + +For writing to other Parquet compatible data formats, users need to create the ParquetWriterFactory with a custom implementation of the [ParquetBuilder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/parquet/ParquetBuilder.html) interface. + +To use the Parquet bulk encoder in your application you need to add the following dependency: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parquet{{ site.scala_version_suffix }}</artifactId> + <version>{{ site.version }}</version> +</dependency> +{% endhighlight %} + +A `FileSink` that writes Avro data to Parquet format can be created like this: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; +import org.apache.avro.Schema; + + +Schema schema = ...; +DataStream<GenericRecord> stream = ...; + +final FileSink<GenericRecord> sink = FileSink + .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema)) + .build(); + +input.sinkTo(sink); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.formats.parquet.avro.ParquetAvroWriters +import org.apache.avro.Schema + +val schema: Schema = ... +val input: DataStream[GenericRecord] = ... + +val sink: FileSink[GenericRecord] = FileSink + .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema)) + .build() + +input.sinkTo(sink) + +{% endhighlight %} +</div> +</div> + +Similarly, a `FileSink` that writes Protobuf data to Parquet format can be created like this: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters; + +// ProtoRecord is a generated protobuf Message class. +DataStream<ProtoRecord> stream = ...; + +final FileSink<ProtoRecord> sink = FileSink + .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class)) + .build(); + +input.sinkTo(sink); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters + +// ProtoRecord is a generated protobuf Message class. +val input: DataStream[ProtoRecord] = ... + +val sink: FileSink[ProtoRecord] = FileSink + .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord])) + .build() + +input.sinkTo(sink) + +{% endhighlight %} +</div> +</div> + +#### Avro format + +Flink also provides built-in support for writing data into Avro files. A list of convenience methods to create +Avro writer factories and their associated documentation can be found in the +[AvroWriters]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/avro/AvroWriters.html) class. + +To use the Avro writers in your application you need to add the following dependency: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>{{ site.version }}</version> +</dependency> +{% endhighlight %} + +A `FileSink` that writes data to Avro files can be created like this: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.formats.avro.AvroWriters; +import org.apache.avro.Schema; + + +Schema schema = ...; +DataStream<GenericRecord> stream = ...; + +final FileSink<GenericRecord> sink = FileSink + .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema)) + .build(); + +input.sinkTo(sink); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.formats.avro.AvroWriters +import org.apache.avro.Schema + +val schema: Schema = ... +val input: DataStream[GenericRecord] = ... + +val sink: FileSink[GenericRecord] = FileSink + .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema)) + .build() + +input.sinkTo(sink) + +{% endhighlight %} +</div> +</div> + +For creating customized Avro writers, e.g. enabling compression, users need to create the `AvroWriterFactory` +with a custom implementation of the [AvroBuilder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/avro/AvroBuilder.html) interface: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> { + Schema schema = ReflectData.get().getSchema(Address.class); + DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema); + + DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.setCodec(CodecFactory.snappyCodec()); + dataFileWriter.create(schema, out); + return dataFileWriter; +}); + +DataStream<Address> stream = ... +stream.sinkTo(FileSink.forBulkFormat( + outputBasePath, + factory).build()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() { + override def createWriter(out: OutputStream): DataFileWriter[Address] = { + val schema = ReflectData.get.getSchema(classOf[Address]) + val datumWriter = new ReflectDatumWriter[Address](schema) + + val dataFileWriter = new DataFileWriter[Address](datumWriter) + dataFileWriter.setCodec(CodecFactory.snappyCodec) + dataFileWriter.create(schema, out) + dataFileWriter + } +}) + +val stream: DataStream[Address] = ... +stream.sinkTo(FileSink.forBulkFormat( + outputBasePath, + factory).build()); +{% endhighlight %} +</div> +</div> + +#### ORC Format + +To enable the data to be bulk encoded in ORC format, Flink offers [OrcBulkWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html) +which takes a concrete implementation of [Vectorizer]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/orc/vector/Vectorizer.html). + +Like any other columnar format that encodes data in bulk fashion, Flink's `OrcBulkWriter` writes the input elements in batches. It uses +ORC's `VectorizedRowBatch` to achieve this. + +Since the input element has to be transformed to a `VectorizedRowBatch`, users have to extend the abstract `Vectorizer` +class and override the `vectorize(T element, VectorizedRowBatch batch)` method. As you can see, the method provides an +instance of `VectorizedRowBatch` to be used directly by the users so users just have to write the logic to transform the +input `element` to `ColumnVectors` and set them in the provided `VectorizedRowBatch` instance. + +For example, if the input element is of type `Person` which looks like: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +class Person { + private final String name; + private final int age; + ... +} + +{% endhighlight %} +</div> +</div> + +Then a child implementation to convert the element of type `Person` and set them in the `VectorizedRowBatch` can be like: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; + +public class PersonVectorizer extends Vectorizer<Person> implements Serializable { + public PersonVectorizer(String schema) { + super(schema); + } + @Override + public void vectorize(Person element, VectorizedRowBatch batch) throws IOException { + BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0]; + LongColumnVector ageColVector = (LongColumnVector) batch.cols[1]; + int row = batch.size++; + nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8)); + ageColVector.vector[row] = element.getAge(); + } +} + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import java.nio.charset.StandardCharsets +import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector} + +class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { + + override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = { + val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector] + val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector] + nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8)) + ageColVector.vector(batch.size + 1) = element.getAge + } + +} + +{% endhighlight %} +</div> +</div> + +To use the ORC bulk encoder in an application, users need to add the following dependency: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-orc{{ site.scala_version_suffix }}</artifactId> + <version>{{ site.version }}</version> +</dependency> +{% endhighlight %} + +And then a `FileSink` that writes data in ORC format can be created like this: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.orc.writer.OrcBulkWriterFactory; + +String schema = "struct<_col0:string,_col1:int>"; +DataStream<Person> stream = ...; + +final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema)); + +final FileSink<Person> sink = FileSink + .forBulkFormat(outputBasePath, writerFactory) + .build(); + +input.sinkTo(sink); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.orc.writer.OrcBulkWriterFactory + +val schema: String = "struct<_col0:string,_col1:int>" +val input: DataStream[Person] = ... +val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema)); + +val sink: FileSink[Person] = FileSink + .forBulkFormat(outputBasePath, writerFactory) + .build() + +input.sinkTo(sink) + +{% endhighlight %} +</div> +</div> + +OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so that a custom Hadoop configuration and ORC +writer properties can be provided. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +String schema = ...; +Configuration conf = ...; +Properties writerProperties = new Properties(); + +writerProps.setProperty("orc.compress", "LZ4"); +// Other ORC supported properties can also be set similarly. + +final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>( + new PersonVectorizer(schema), writerProperties, conf); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val schema: String = ... +val conf: Configuration = ... +val writerProperties: Properties = new Properties() + +writerProps.setProperty("orc.compress", "LZ4") +// Other ORC supported properties can also be set similarly. + +val writerFactory = new OrcBulkWriterFactory( + new PersonVectorizer(schema), writerProperties, conf) +{% endhighlight %} +</div> +</div> + +The complete list of ORC writer properties can be found [here](https://orc.apache.org/docs/hive-config.html). + +Users who want to add user metadata to the ORC files can do so by calling `addUserMetadata(...)` inside the overriding +`vectorize(...)` method. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +public class PersonVectorizer extends Vectorizer<Person> implements Serializable { + @Override + public void vectorize(Person element, VectorizedRowBatch batch) throws IOException { + ... + String metadataKey = ...; + ByteBuffer metadataValue = ...; + this.addUserMetadata(metadataKey, metadataValue); + } +} + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} + +class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { + + override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = { + ... + val metadataKey: String = ... + val metadataValue: ByteBuffer = ... + addUserMetadata(metadataKey, metadataValue) + } + +} + +{% endhighlight %} +</div> +</div> + +#### Hadoop SequenceFile format + +To use the `SequenceFile` bulk encoder in your application you need to add the following dependency: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sequence-file</artifactId> + <version>{{ site.version }}</version> +</dependency> +{% endhighlight %} + +A simple `SequenceFile` writer can be created like this: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; + + +DataStream<Tuple2<LongWritable, Text>> input = ...; +Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); +final FileSink<Tuple2<LongWritable, Text>> sink = FileSink + .forBulkFormat( + outputBasePath, + new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, Text.class)) + .build(); + +input.sinkTo(sink); + +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.configuration.GlobalConfiguration +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.SequenceFile +import org.apache.hadoop.io.Text; + +val input: DataStream[(LongWritable, Text)] = ... +val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()) +val sink: FileSink[(LongWritable, Text)] = FileSink + .forBulkFormat( + outputBasePath, + new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class)) + .build() + +input.sinkTo(sink) + +{% endhighlight %} +</div> +</div> + +The `SequenceFileWriterFactory` supports additional constructor parameters to specify compression settings. + +## Bucket Assignment + +The bucketing logic defines how the data will be structured into subdirectories inside the base output directory. + +Both row and bulk formats (see [File Formats](#file-formats)) use the [DateTimeBucketAssigner]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html) as the default assigner. +By default the `DateTimeBucketAssigner` creates hourly buckets based on the system default timezone +with the following format: `yyyy-MM-dd--HH`. Both the date format (*i.e.* bucket size) and timezone can be +configured manually. + +We can specify a custom [BucketAssigner]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html) by calling `.withBucketAssigner(assigner)` on the format builders. + +Flink comes with two built-in BucketAssigners: + + - [DateTimeBucketAssigner]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html) : Default time based assigner + - [BasePathBucketAssigner]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.html) : Assigner that stores all part files in the base path (single global bucket) + +## Rolling Policy + +The [RollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html) defines when a given in-progress part file will be closed and moved to the pending and later to finished state. +Part files in the "finished" state are the ones that are ready for viewing and are guaranteed to contain valid data that will not be reverted in case of failure. +In `STREAMING` mode, the Rolling Policy in combination with the checkpointing interval (pending files become finished on the next checkpoint) control how quickly +part files become available for downstream readers and also the size and number of these parts. In `BATCH` mode, part-files become visible at the end of the job but +the rolling policy can control their maximum size. + +Flink comes with two built-in RollingPolicies: + + - [DefaultRollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html) + - [OnCheckpointRollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.html) + +## Part file lifecycle + +In order to use the output of the `FileSink` in downstream systems, we need to understand the naming and lifecycle of the output files produced. + +Part files can be in one of three states: + 1. **In-progress** : The part file that is currently being written to is in-progress + 2. **Pending** : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed + 3. **Finished** : On successful checkpoints (`STREAMING`) or at the end of input (`BATCH`) pending files transition to "Finished" + +Only finished files are safe to read by downstream systems as those are guaranteed to not be modified later. + +Each writer subtask will have a single in-progress part file at any given time for every active bucket, but there can be several pending and finished files. + +**Part file example** + +To better understand the lifecycle of these files let's look at a simple example with 2 sink subtasks: + +``` +└── 2019-08-25--12 + ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334 + └── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575 +``` + +When the part file `part-81fc4980-a6af-41c8-9937-9939408a734b-0` is rolled (let's say it becomes too large), it becomes pending but it is not renamed. The sink then opens a new part file: `part-81fc4980-a6af-41c8-9937-9939408a734b-1`: + +``` +└── 2019-08-25--12 + ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334 + ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575 + └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11 +``` + +As `part-81fc4980-a6af-41c8-9937-9939408a734b-0` is now pending completion, after the next successful checkpoint, it is finalized: + +``` +└── 2019-08-25--12 + ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334 + ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0 + └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11 +``` + +New buckets are created as dictated by the bucketing policy, and this doesn't affect currently in-progress files: + +``` +└── 2019-08-25--12 + ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334 + ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0 + └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11 +└── 2019-08-25--13 + └── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1 +``` + +Old buckets can still receive new records as the bucketing policy is evaluated on a per-record basis. + +### Part file configuration + +Finished files can be distinguished from the in-progress ones by their naming scheme only. + +By default, the file naming strategy is as follows: + - **In-progress / Pending**: `part-<subtask-uid>-<partFileIndex>.inprogress.uid` Review comment: Should it be `<subtask>-<uid>`, or is it just `part-<uid>-...`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org