[
https://issues.apache.org/jira/browse/FLINK-30985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17687041#comment-17687041
]
ming li commented on FLINK-30985:
---------------------------------
[~lzljs3620320] Consider extreme cases, for example, I only have one source
task, and there are 2 buckets in the table store.
The split of {{snapshot-1 }}of {{bucket-1}} is allocated for the first time.
When the task consumption is completed, it will request to allocate the next
split. Assuming that the consumption speed is lower than the production speed,
a new split is generated in {{bucket-1}} of {{snapshot-2}} at this time.
According to the allocation algorithm, we will allocate the split of
{{bucket-1}} instead of {{{}bucket-2{}}}, which causes {{bucket-2}} to not be
consumed (or we cannot consume the complete data of {{snapshot-1}} ?).
Please let me know if you notice any mistakes or omissions. Thanks.
> [Flink][Table Store] Change the Splits allocation algorithm of
> ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30985
> URL: https://issues.apache.org/jira/browse/FLINK-30985
> Project: Flink
> Issue Type: Improvement
> Components: Table Store
> Reporter: ming li
> Priority: Major
>
> Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in
> {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the
> number of buckets is fixed, the order of traversal is also fixed.
> {code:java}
> private void assignSplits() {
> bucketSplits.forEach(
> (bucket, splits) -> {
> if (splits.size() > 0) {
> // To ensure the order of consumption, the data of the
> same bucket is given
> // to a task to be consumed.
> int task = bucket % context.currentParallelism();
> if (readersAwaitingSplit.remove(task)) {
> // if the reader that requested another split has
> failed in the
> // meantime, remove
> // it from the list of waiting readers
> if (!context.registeredReaders().containsKey(task)) {
> return;
> }
> context.assignSplit(splits.poll(), task);
> }
> }
> });
> }{code}
> Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough
> split in each {{bucket}} , so that the first {{bucket}} will always be
> assigned to the task, and other buckets may not be consumed for a long time,
> resulting in uneven consumption and difficulty in advancing
> {{{}watermark{}}}. So I think we should change the split allocation algorithm
> to a fair algorithm.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)