kl0u commented on a change in pull request #9581: [FLINK-13864][streaming]:
Modify the StreamingFileSink Builder interface to allow for easier subclassing
of StreamingFileSink
URL: https://github.com/apache/flink/pull/9581#discussion_r321710154
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
##########
@@ -224,35 +231,52 @@ private RowFormatBuilder(
this.partFileSuffix =
Preconditions.checkNotNull(partFileSuffix);
}
- public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withBucketCheckInterval(final long interval) {
- return new RowFormatBuilder<>(basePath, encoder,
bucketAssigner, rollingPolicy, interval, bucketFactory, partFilePrefix,
partFileSuffix);
+ public T withBucketCheckInterval(final long interval) {
+ this.bucketCheckInterval = interval;
+ return self();
}
- public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
- return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval,
bucketFactory, partFilePrefix, partFileSuffix);
+ public T withBucketAssigner(final BucketAssigner<IN, BucketID>
assigner) {
+ this.bucketAssigner =
Preconditions.checkNotNull(assigner);
+ return self();
}
- public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
- return new RowFormatBuilder<>(basePath, encoder,
bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval,
bucketFactory, partFilePrefix, partFileSuffix);
+ public T withRollingPolicy(final RollingPolicy<IN, BucketID>
policy) {
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
+ return self();
}
- public <ID> StreamingFileSink.RowFormatBuilder<IN, ID>
withBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner, final
RollingPolicy<IN, ID> policy) {
- return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy),
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), partFilePrefix,
partFileSuffix);
+ public T withBucketAssignerAndPolicy(final BucketAssigner<IN,
BucketID> assigner, final RollingPolicy<IN, BucketID> policy) {
+ this.bucketAssigner =
Preconditions.checkNotNull(assigner);
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
+ return self();
}
- public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withPartFilePrefix(final String partPrefix) {
- return new RowFormatBuilder<>(basePath, encoder,
bucketAssigner, rollingPolicy, bucketCheckInterval, bucketFactory, partPrefix,
partFileSuffix);
+ public T withPartFilePrefix(final String partPrefix) {
+ this.partFilePrefix = partPrefix;
+ return self();
}
- public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withPartFileSuffix(final String partSuffix) {
- return new RowFormatBuilder<>(basePath, encoder,
bucketAssigner, rollingPolicy, bucketCheckInterval, bucketFactory,
partFilePrefix, partSuffix);
+ public T withPartFileSuffix(final String partSuffix) {
+ this.partFileSuffix = partSuffix;
+ return self();
+ }
+
+ public <ID> StreamingFileSink.RowFormatBuilder<IN, ID, ?
extends RowFormatBuilder<IN, ID, ?>>
newBuilderWithBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner,
final RollingPolicy<IN, ID> policy) {
+ return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy),
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), partFilePrefix,
partFileSuffix);
Review comment:
We can add a check `checkState(bucketFactory.getClass() ==
DefaultBucketFactoryImpl.class, "withBucketAssignerAndPolicy() should be called
before specifying a custom BucketFactory.")`. Currently there is no way to
change the `bucketFactory` but it could be added in the future.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services