[ 
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732424#comment-17732424
 ] 

Rui Fan commented on FLINK-31655:
---------------------------------

Hi [~tartarus] , some of our flink scenarios also need to use adaptive 
rebalance, for example: sink data to hdfs, and some subtasks meet slow 
datanode, flink can forward data to idle subtasks. 

We have finished the internal version, and it works well. I read all 
discussions in detail and I want to share our solution here. It's similar your 
solution1, and solved the performance issue.

We defined an option: dynamic-rebalance.max-traverse-size. It means we don't 
traverse all sub-partitions, we just traverse the next traverse size based on 
current channel. For general logic, we find the most idle channel from 
channel[currentChannel+1] to channel[currentChannel+maxTraverseSize].

For example, *dynamic-rebalance.max-traverse-size=10* , *sub-partitions size = 
100* and {*}currentChannel=5{*}. When sending the a record, we find the most 
idle channel from channel-6 to channel-15 instead of all channels, and mark the 
most idle channel as the currentChannel.
h2. Why do we choose this solution?
 # On our production, just a small number of subtasks is very slow, and a large 
number of subtasks is healthy. And the max-traverse-size=10 is enough to skip 
these slow subtasks.
 # It solved the performance issue of solution1.
 # It doesn't introduce extra lock or collection to maintain the idle channel 
set than solution2.
 # We improved the *max-traverse-size* strategy. We will choose the channel 
directly if we found the pending size of any channel is 0 from 
channel[currentChannel+1] to channel[currentChannel+maxTraverseSize]. Because 
the channel is most idle channel when pending size is 0.

Here is core code:
{code:java}
private void chooseLessLoadedSubPartition() {
    long bytesInQueue = Long.MAX_VALUE;
    int candidateChannel = 0;
    for (int i = 1; i <= maxTraverseSize; i++) {
        int channel = (currentChannel + i) % numberOfSubpartitions;
        long bytesInQueueCurrent = 
targetPartition.getBytesInQueueUnsafe(channel);

        if (bytesInQueueCurrent == 0) {
            // If there isn't any pending data in the current channel, choose 
this channel
            // directly.
            currentChannel = channel;
            return;
        }
        if (bytesInQueueCurrent < bytesInQueue) {
            candidateChannel = channel;
            bytesInQueue = bytesInQueueCurrent;
        }
    }
    currentChannel = candidateChannel;
} {code}
 

Looking forward to your feedback and suggestion, thanks.

> Adaptive Channel selection for partitioner
> ------------------------------------------
>
>                 Key: FLINK-31655
>                 URL: https://issues.apache.org/jira/browse/FLINK-31655
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: tartarus
>            Assignee: tartarus
>            Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the 
> same, then by default the RebalancePartitioner will be used to select the 
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc 
> services, If some of the Operators are slow to return requests (for external 
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel 
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is 
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing 
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput 
> of job!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to