yxu-valleytider commented on a change in pull request #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink URL: https://github.com/apache/flink/pull/9581#discussion_r326002736
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java ########## @@ -162,47 +162,54 @@ protected StreamingFileSink( * @return The builder where the remaining of the configuration parameters for the sink can be configured. * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters. */ - public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> forBulkFormat( + public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String, ? extends BulkFormatBuilder<IN, String, ?>> forBulkFormat( final Path basePath, final BulkWriter.Factory<IN> writerFactory) { return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>()); } /** * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. */ - protected abstract static class BucketsBuilder<IN, BucketID> implements Serializable { + protected abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>> implements Serializable { private static final long serialVersionUID = 1L; + protected static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L; + + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; + } + abstract Buckets<IN, BucketID> createBuckets(final int subtaskIndex) throws IOException; } /** * A builder for configuring the sink for row-wise encoding formats. */ @PublicEvolving - public static class RowFormatBuilder<IN, BucketID> extends StreamingFileSink.BucketsBuilder<IN, BucketID> { + public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>> extends StreamingFileSink.BucketsBuilder<IN, BucketID, T> { private static final long serialVersionUID = 1L; - private final long bucketCheckInterval; + protected long bucketCheckInterval; Review comment: DONE. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services