pltbkd commented on a change in pull request #18680:
URL: https://github.com/apache/flink/pull/18680#discussion_r804443649



##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends 
BucketsBuilder<IN, ?>> bucketsBuil
                 basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
     }
 
+    public BucketWriter<IN, String> createBucketWriter() throws IOException {
+        return bucketsBuilder.createBucketWriter();
+    }
+
+    public FileCompactor getFileCompactor() {
+        return bucketsBuilder.getFileCompactor();
+    }
+
+    @Override
+    public DataStream<CommittableMessage<FileSinkCommittable>> 
addPreCommitTopology(
+            DataStream<CommittableMessage<FileSinkCommittable>> 
committableStream) {
+        FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy();
+        if (strategy == null) {
+            // not enabled
+            return committableStream;
+        }
+
+        SingleOutputStreamOperator<CompactorRequest> coordinatorOp =
+                committableStream
+                        .rescale()

Review comment:
       I found that we have to set a partition strategy here, or the graph will 
be generated with "forward". Rebalance is ok to replace rescale, of course.
   
   The problem is, the partition transformation is already generated before 
expanding the sink, and the partitioner is forward because the parallelism of 
writer and committer are the same. Since the parallelism of coordinator is 
fixed to 1, which is probably not same with that of the writers, the partition 
transformation must be overwritten.
   
   By the way, I found another issue relating to this. The parallelism of 
compactor operators and committers also have to be the same now. In the current 
FileSink design, committableSummary and committables are sent sequentially from 
the writer to the committer, that means the receiver of a sequence must be a 
fixed one. So the parallelism of writers must be equals to (when using forward 
partitioner) or more than (when using rescale partitioner) that of the 
committers. For the same reason, the parallelism of compactor operators is also 
be restricted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to