JingsongLi commented on code in PR #6460:
URL: https://github.com/apache/paimon/pull/6460#discussion_r2473622845
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java:
##########
@@ -282,26 +340,22 @@ protected DataStreamSink<?>
buildForFixedBucket(DataStream<InternalRow> input) {
+ " then the parallelism of writerOperator will be
set to bucketNums.");
parallelism = bucketNums;
}
- DataStream<InternalRow> partitioned =
- partition(
- input,
- new RowDataChannelComputer(table.schema(),
logSinkFunction != null),
- parallelism);
- FixedBucketSink sink = new FixedBucketSink(table, overwritePartition,
logSinkFunction);
- return sink.sinkFrom(partitioned);
}
- private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow>
input) {
- ChannelComputer<InternalRow> channelComputer;
- if (!table.partitionKeys().isEmpty()
- && table.coreOptions().partitionSinkStrategy() ==
PartitionSinkStrategy.HASH) {
- channelComputer = new
RowDataHashPartitionChannelComputer(table.schema());
+ @Nullable
+ private Integer getPostponeFixedBucketNumber() {
+ // If data exists, use the current bucket number; otherwise, use
sink.parallelism if set. If
+ // neither is set, use Flink's parallelism (get at runtime), but here
we should use a
+ // default number.
+ List<SimpleFileEntry> simpleFileEntries =
+
table.store().newScan().onlyReadRealBuckets().readSimpleEntries();
Review Comment:
If part-1 bucket is 5, part-2 bucket is 10.
And now you are writing part-2, but read entry from part-1, what happen?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]