[
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732424#comment-17732424
]
Rui Fan commented on FLINK-31655:
---------------------------------
Hi [~tartarus] , some of our flink scenarios also need to use adaptive
rebalance, for example: sink data to hdfs, and some subtasks meet slow
datanode, flink can forward data to idle subtasks.
We have finished the internal version, and it works well. I read all
discussions in detail and I want to share our solution here. It's similar your
solution1, and solved the performance issue.
We defined an option: dynamic-rebalance.max-traverse-size. It means we don't
traverse all sub-partitions, we just traverse the next traverse size based on
current channel. For general logic, we find the most idle channel from
channel[currentChannel+1] to channel[currentChannel+maxTraverseSize].
For example, *dynamic-rebalance.max-traverse-size=10* , *sub-partitions size =
100* and {*}currentChannel=5{*}. When sending the a record, we find the most
idle channel from channel-6 to channel-15 instead of all channels, and mark the
most idle channel as the currentChannel.
h2. Why do we choose this solution?
# On our production, just a small number of subtasks is very slow, and a large
number of subtasks is healthy. And the max-traverse-size=10 is enough to skip
these slow subtasks.
# It solved the performance issue of solution1.
# It doesn't introduce extra lock or collection to maintain the idle channel
set than solution2.
# We improved the *max-traverse-size* strategy. We will choose the channel
directly if we found the pending size of any channel is 0 from
channel[currentChannel+1] to channel[currentChannel+maxTraverseSize]. Because
the channel is most idle channel when pending size is 0.
Here is core code:
{code:java}
private void chooseLessLoadedSubPartition() {
long bytesInQueue = Long.MAX_VALUE;
int candidateChannel = 0;
for (int i = 1; i <= maxTraverseSize; i++) {
int channel = (currentChannel + i) % numberOfSubpartitions;
long bytesInQueueCurrent =
targetPartition.getBytesInQueueUnsafe(channel);
if (bytesInQueueCurrent == 0) {
// If there isn't any pending data in the current channel, choose
this channel
// directly.
currentChannel = channel;
return;
}
if (bytesInQueueCurrent < bytesInQueue) {
candidateChannel = channel;
bytesInQueue = bytesInQueueCurrent;
}
}
currentChannel = candidateChannel;
} {code}
Looking forward to your feedback and suggestion, thanks.
> Adaptive Channel selection for partitioner
> ------------------------------------------
>
> Key: FLINK-31655
> URL: https://issues.apache.org/jira/browse/FLINK-31655
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: tartarus
> Assignee: tartarus
> Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the
> same, then by default the RebalancePartitioner will be used to select the
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc
> services, If some of the Operators are slow to return requests (for external
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput
> of job!
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)