eskabetxe commented on a change in pull request #9533: [FLINK-13850]
[Connectors / FileSystem] Refactor part file configuration into a single method
URL: https://github.com/apache/flink/pull/9533#discussion_r343588005
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
##########
@@ -303,36 +295,30 @@ private BulkFormatBuilder(
BucketAssigner<IN, BucketID> assigner,
long bucketCheckInterval,
BucketFactory<IN, BucketID> bucketFactory,
- String partFilePrefix,
- String partFileSuffix) {
+ PartFileConfig partFileConfig) {
this.basePath = Preconditions.checkNotNull(basePath);
this.writerFactory = writerFactory;
this.bucketAssigner =
Preconditions.checkNotNull(assigner);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory =
Preconditions.checkNotNull(bucketFactory);
- this.partFilePrefix =
Preconditions.checkNotNull(partFilePrefix);
- this.partFileSuffix =
Preconditions.checkNotNull(partFileSuffix);
+ this.partFileConfig =
Preconditions.checkNotNull(partFileConfig);
}
public StreamingFileSink.BulkFormatBuilder<IN, BucketID>
withBucketCheckInterval(long interval) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, interval, bucketFactory, partFilePrefix, partFileSuffix);
+ return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, interval, bucketFactory, partFileConfig);
}
public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID>
withBucketAssigner(BucketAssigner<IN, ID> assigner) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
Preconditions.checkNotNull(assigner), bucketCheckInterval, new
DefaultBucketFactoryImpl<>(), partFilePrefix, partFileSuffix);
+ return new BulkFormatBuilder<>(basePath, writerFactory,
Preconditions.checkNotNull(assigner), bucketCheckInterval, new
DefaultBucketFactoryImpl<>(), partFileConfig);
}
@VisibleForTesting
StreamingFileSink.BulkFormatBuilder<IN, BucketID>
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, bucketCheckInterval, Preconditions.checkNotNull(factory),
partFilePrefix, partFileSuffix);
+ return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, bucketCheckInterval, Preconditions.checkNotNull(factory),
partFileConfig);
}
- public StreamingFileSink.BulkFormatBuilder<IN, BucketID>
withPartFilePrefix(final String partPrefix) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, bucketCheckInterval, bucketFactory, partPrefix, partFileSuffix);
- }
-
- public StreamingFileSink.BulkFormatBuilder<IN, BucketID>
withPartFileSuffix(final String partSuffix) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, bucketCheckInterval, bucketFactory, partFilePrefix, partSuffix);
+ public StreamingFileSink.BulkFormatBuilder<IN, BucketID>
withPartFileConfig(final PartFileConfig partFileConfig) {
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services