kl0u commented on a change in pull request #9228: [FLINK-13428][Connectors / 
FileSystem] allow part file names to be configurable
URL: https://github.com/apache/flink/pull/9228#discussion_r309677978
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ##########
 @@ -207,29 +211,37 @@ private RowFormatBuilder(
                                BucketAssigner<IN, BucketID> assigner,
                                RollingPolicy<IN, BucketID> policy,
                                long bucketCheckInterval,
-                               BucketFactory<IN, BucketID> bucketFactory) {
+                               BucketFactory<IN, BucketID> bucketFactory,
+                               String partPrefix,
+                               String partSuffix) {
                        this.basePath = Preconditions.checkNotNull(basePath);
                        this.encoder = Preconditions.checkNotNull(encoder);
                        this.bucketAssigner = 
Preconditions.checkNotNull(assigner);
                        this.rollingPolicy = Preconditions.checkNotNull(policy);
                        this.bucketCheckInterval = bucketCheckInterval;
                        this.bucketFactory = 
Preconditions.checkNotNull(bucketFactory);
+                       this.partPrefix = 
Preconditions.checkNotNull(partPrefix);
+                       this.partSuffix = 
Preconditions.checkNotNull(partSuffix);
                }
 
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketCheckInterval(final long interval) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, rollingPolicy, interval, bucketFactory);
+                       return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, rollingPolicy, interval, bucketFactory, partPrefix, partSuffix);
                }
 
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval, 
bucketFactory);
+                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval, 
bucketFactory, partPrefix, partSuffix);
                }
 
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval, 
bucketFactory);
+                       return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval, 
bucketFactory, partPrefix, partSuffix);
                }
 
                public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> 
withBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner, final 
RollingPolicy<IN, ID> policy) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactoryImpl<>());
+                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), partPrefix, partSuffix);
+               }
+
+               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withPartFileConfig(String partPrefix) {
 
 Review comment:
   Both builders should have the following methods:
   
   ```
   public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withPartFilePrefix(final String partPrefix) {
                        return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, rollingPolicy, bucketCheckInterval, bucketFactory, partPrefix, 
partSuffix);
                }
   
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withPartFileSuffix(final String partSuffix) {
                        return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, rollingPolicy, bucketCheckInterval, bucketFactory, partPrefix, 
partSuffix);
                }
   ```
   
   and these parameters will be encapsulated into a `PartFileConfig` object in 
their respective `createBuckets()` method. The latter object can then be passed 
to the `StreamingFileSink` constructor.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to