yxu-valleytider commented on a change in pull request #9581: 
[FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to 
allow for easier subclassing of StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#discussion_r320514871
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ##########
 @@ -314,25 +332,30 @@ private BulkFormatBuilder(
                        this.partFileSuffix = 
Preconditions.checkNotNull(partFileSuffix);
                }
 
-               public StreamingFileSink.BulkFormatBuilder<IN, BucketID> 
withBucketCheckInterval(long interval) {
-                       return new BulkFormatBuilder<>(basePath, writerFactory, 
bucketAssigner, interval, bucketFactory, partFilePrefix, partFileSuffix);
+               public T withBucketCheckInterval(long interval) {
+                       this.bucketCheckInterval = interval;
+                       return self();
                }
 
-               public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> 
withBucketAssigner(BucketAssigner<IN, ID> assigner) {
-                       return new BulkFormatBuilder<>(basePath, writerFactory, 
Preconditions.checkNotNull(assigner), bucketCheckInterval, new 
DefaultBucketFactoryImpl<>(), partFilePrefix, partFileSuffix);
+               public T withBucketAssigner(BucketAssigner<IN, BucketID> 
assigner) {
 
 Review comment:
   @kl0u Did a small experiment with a non-string BucketID assigner :    
   ```
   class NonStringBucketIdAssigner implements BucketAssigner<Message, Integer> {
       ...
   }
   ```
   and 
   ```
   CustomizedStreamingFileSink.forBulkFormat(...)
               .withBucketAssigner(new NonStringBucketIdAssigner())
               .build();
   ```
   
   The casting appears to be successful as long as the `assigner` parameter can 
be casted to `BucketAssigner`.  It makes sense considering the effect of 
unbounded type erasure.  
   
   Also updating the PR to reflect this in the casting statement.   If the 
casting is truly a concern we could keep the original method with different 
method names, e.g., `newBuilderWithBucketAssigner`. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to