pltbkd commented on a change in pull request #18680:
URL: https://github.com/apache/flink/pull/18680#discussion_r804443649
##########
File path:
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends
BucketsBuilder<IN, ?>> bucketsBuil
basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
}
+ public BucketWriter<IN, String> createBucketWriter() throws IOException {
+ return bucketsBuilder.createBucketWriter();
+ }
+
+ public FileCompactor getFileCompactor() {
+ return bucketsBuilder.getFileCompactor();
+ }
+
+ @Override
+ public DataStream<CommittableMessage<FileSinkCommittable>>
addPreCommitTopology(
+ DataStream<CommittableMessage<FileSinkCommittable>>
committableStream) {
+ FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy();
+ if (strategy == null) {
+ // not enabled
+ return committableStream;
+ }
+
+ SingleOutputStreamOperator<CompactorRequest> coordinatorOp =
+ committableStream
+ .rescale()
Review comment:
I found that we have to set a partition strategy here, or the graph will
be generated with "forward". Rebalance is ok to replace rescale, of course.
The problem is, the partition transformation is already generated before
expanding the sink, and the partitioner is forward because the parallelism of
writer and committer are the same. Since the parallelism of coordinator is
fixed to 1, which is probably not same with that of the writers, the partition
transformation must be overwritten.
By the way, I found another issue relating to this. The parallelism of
compactor operators and committers also have to be the same now. In the current
FileSink design, committableSummary and committables are sent sequentially from
the writer to the committer, that means the receiver of a sequence must be a
fixed one. So the parallelism of writers must be equals to (when using forward
partitioner) or more than (when using rescale partitioner) that of the
committers. For the same reason, the parallelism of compactor operators is also
be restricted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]