[
https://issues.apache.org/jira/browse/FLINK-30985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686974#comment-17686974
]
Jingsong Lee commented on FLINK-30985:
--------------------------------------
[~Ming Li] Thanks for reporting.
This ticket may be related to https://issues.apache.org/jira/browse/FLINK-31008
The reason why same bucket consumed by same task is that we need to make sure
the order of the consuming of one bucket.
> [Flink][Table Store] Change the Splits allocation algorithm of
> ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30985
> URL: https://issues.apache.org/jira/browse/FLINK-30985
> Project: Flink
> Issue Type: Improvement
> Components: Table Store
> Reporter: ming li
> Priority: Major
>
> Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in
> {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the
> number of buckets is fixed, the order of traversal is also fixed.
> {code:java}
> private void assignSplits() {
> bucketSplits.forEach(
> (bucket, splits) -> {
> if (splits.size() > 0) {
> // To ensure the order of consumption, the data of the
> same bucket is given
> // to a task to be consumed.
> int task = bucket % context.currentParallelism();
> if (readersAwaitingSplit.remove(task)) {
> // if the reader that requested another split has
> failed in the
> // meantime, remove
> // it from the list of waiting readers
> if (!context.registeredReaders().containsKey(task)) {
> return;
> }
> context.assignSplit(splits.poll(), task);
> }
> }
> });
> }{code}
> Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough
> split in each {{bucket}} , so that the first {{bucket}} will always be
> assigned to the task, and other buckets may not be consumed for a long time,
> resulting in uneven consumption and difficulty in advancing
> {{{}watermark{}}}. So I think we should change the split allocation algorithm
> to a fair algorithm.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)