kl0u commented on a change in pull request #9581: [FLINK-13864][streaming]:
Modify the StreamingFileSink Builder interface to allow for easier subclassing
of StreamingFileSink
URL: https://github.com/apache/flink/pull/9581#discussion_r321709238
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
##########
@@ -162,47 +162,54 @@ protected StreamingFileSink(
* @return The builder where the remaining of the configuration
parameters for the sink can be configured.
* In order to instantiate the sink, call {@link
RowFormatBuilder#build()} after specifying the desired parameters.
*/
- public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String>
forBulkFormat(
+ public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String, ?
extends BulkFormatBuilder<IN, String, ?>> forBulkFormat(
final Path basePath, final BulkWriter.Factory<IN>
writerFactory) {
return new StreamingFileSink.BulkFormatBuilder<>(basePath,
writerFactory, new DateTimeBucketAssigner<>());
}
/**
* The base abstract class for the {@link RowFormatBuilder} and {@link
BulkFormatBuilder}.
*/
- protected abstract static class BucketsBuilder<IN, BucketID> implements
Serializable {
+ protected abstract static class BucketsBuilder<IN, BucketID, T extends
BucketsBuilder<IN, BucketID, T>> implements Serializable {
private static final long serialVersionUID = 1L;
+ protected static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L
* 1000L;
+
+ @SuppressWarnings("unchecked")
+ protected T self() {
+ return (T) this;
+ }
+
abstract Buckets<IN, BucketID> createBuckets(final int
subtaskIndex) throws IOException;
}
/**
* A builder for configuring the sink for row-wise encoding formats.
*/
@PublicEvolving
- public static class RowFormatBuilder<IN, BucketID> extends
StreamingFileSink.BucketsBuilder<IN, BucketID> {
+ public static class RowFormatBuilder<IN, BucketID, T extends
RowFormatBuilder<IN, BucketID, T>> extends StreamingFileSink.BucketsBuilder<IN,
BucketID, T> {
private static final long serialVersionUID = 1L;
- private final long bucketCheckInterval;
+ protected long bucketCheckInterval;
Review comment:
This can be `private`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services