[ 
https://issues.apache.org/jira/browse/FLINK-30985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17687041#comment-17687041
 ] 

ming li commented on FLINK-30985:
---------------------------------

[~lzljs3620320] Consider extreme cases, for example, I only have one source 
task, and there are 2 buckets in the table store.

The split of {{snapshot-1 }}of {{bucket-1}} is allocated for the first time. 
When the task consumption is completed, it will request to allocate the next 
split. Assuming that the consumption speed is lower than the production speed, 
a new split is generated in {{bucket-1}} of {{snapshot-2}} at this time. 
According to the allocation algorithm, we will allocate the split of 
{{bucket-1}} instead of {{{}bucket-2{}}}, which causes {{bucket-2}} to not be 
consumed (or we cannot consume the complete data of {{snapshot-1}} ?).

Please let me know if you notice any mistakes or omissions. Thanks.

> [Flink][Table Store] Change the Splits allocation algorithm of 
> ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30985
>                 URL: https://issues.apache.org/jira/browse/FLINK-30985
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table Store
>            Reporter: ming li
>            Priority: Major
>
> Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in 
> {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the 
> number of buckets is fixed, the order of traversal is also fixed.
> {code:java}
> private void assignSplits() {
>     bucketSplits.forEach(
>             (bucket, splits) -> {
>                 if (splits.size() > 0) {
>                     // To ensure the order of consumption, the data of the 
> same bucket is given
>                     // to a task to be consumed.
>                     int task = bucket % context.currentParallelism();
>                     if (readersAwaitingSplit.remove(task)) {
>                         // if the reader that requested another split has 
> failed in the
>                         // meantime, remove
>                         // it from the list of waiting readers
>                         if (!context.registeredReaders().containsKey(task)) {
>                             return;
>                         }
>                         context.assignSplit(splits.poll(), task);
>                     }
>                 }
>             });
> }{code}
> Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough 
> split in each {{bucket}} , so that the first {{bucket}} will always be 
> assigned to the task, and other buckets may not be consumed for a long time, 
> resulting in uneven consumption and difficulty in advancing 
> {{{}watermark{}}}. So I think we should change the split allocation algorithm 
> to a fair algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to