kl0u commented on a change in pull request #9530: [FLINK-13842][docs] Improve
Documentation of the StreamingFileSink
URL: https://github.com/apache/flink/pull/9530#discussion_r319897439
##########
File path: docs/dev/connectors/streamfile_sink.md
##########
@@ -71,55 +131,182 @@ input.addSink(sink);
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
val input: DataStream[String] = ...
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outputPath), new
SimpleStringEncoder[String]("UTF-8"))
+ .withRollingPolicy(
+ DefaultRollingPolicy.create()
+ .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+ .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+ .withMaxPartSize(1024 * 1024 * 1024)
+ .build())
.build()
-
+
input.addSink(sink)
{% endhighlight %}
</div>
</div>
-This will create a streaming sink that creates hourly buckets and uses a
-default rolling policy. The default bucket assigner is
-[DateTimeBucketAssigner]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
-and the default rolling policy is
-[DefaultRollingPolicy]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html).
-You can specify a custom
-[BucketAssigner]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html)
-and
-[RollingPolicy]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
-on the sink builder. Please check out the JavaDoc for
-[StreamingFileSink]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html)
-for more configuration options and more documentation about the workings and
-interactions of bucket assigners and rolling policies.
-
-#### Using Bulk-encoded Output Formats
-
-In the above example we used an `Encoder` that can encode or serialize each
-record individually. The streaming file sink also supports bulk-encoded output
-formats such as [Apache Parquet](http://parquet.apache.org). To use these,
-instead of `StreamingFileSink.forRowFormat()` you would use
-`StreamingFileSink.forBulkFormat()` and specify a `BulkWriter.Factory`.
-
-[ParquetAvroWriters]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.html)
-has static methods for creating a `BulkWriter.Factory` for various types.
+This example creates a simple sink that assigns records to the default one
hour time buckets. It also specifies
+a rolling policy that rolls the in-progress part file on either of the
following 3 conditions:
+
+ - It contains at least 15 minutes worth of data
+ - It hasn't received new records for the last 5 minutes
+ - The file size reached 1 GB (after writing the last record)
+
+### Bulk-encoded Formats
+
+Bulk-encoded sinks are created similarly to the row-encoded ones but here
instead of
+specifying an `Encoder` we have to specify [BulkWriter.Factory]({{
site.javadocs_baseurl
}}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html).
+The `BulkWriter` logic defines how new elements added, flushed and how the
bulk of records
+are finalized for further encoding purposes.
<div class="alert alert-info">
- <b>IMPORTANT:</b> Bulk-encoding formats can only be combined with the
- `OnCheckpointRollingPolicy`, which rolls the in-progress part file on
- every checkpoint.
+ <b>IMPORTANT:</b> Checkpointing needs to be enabled when using bulk
encoded formats. Currently these
Review comment:
This is true for all formats (also row ones). If you do not have
checkpointing enabled, nothing gets finalized. With the intervals, in-progress
files will close and new will be created, but they will never be moved to
FINISHED.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services