ming li created FLINK-30985:
-------------------------------
Summary: [Flink][table-store] Change the Splits allocation
algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
Key: FLINK-30985
URL: https://issues.apache.org/jira/browse/FLINK-30985
Project: Flink
Issue Type: Improvement
Components: Table Store
Reporter: ming li
Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in
{{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the
number of buckets is fixed, the order of traversal is also fixed.
{code:java}
private void assignSplits() {
bucketSplits.forEach(
(bucket, splits) -> {
if (splits.size() > 0) {
// To ensure the order of consumption, the data of the same
bucket is given
// to a task to be consumed.
int task = bucket % context.currentParallelism();
if (readersAwaitingSplit.remove(task)) {
// if the reader that requested another split has
failed in the
// meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(task)) {
return;
}
context.assignSplit(splits.poll(), task);
}
}
});
}{code}
Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough
split in each {{bucket}} , so that the first {{bucket}} will always be assigned
to the task, and other buckets may not be consumed for a long time, resulting
in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think
we should change the split allocation algorithm to a fair algorithm.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)