[ 
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738053#comment-17738053
 ] 

Piotr Nowojski commented on FLINK-31655:
----------------------------------------

I also like the idea with {{maxTraverseSize}} option. I have a feeling that in 
the real world, values as small as 5 could be good enough. If not, and we 
really need values like ~20 or larger, we could also think about something like:
* For {{N}}'th {{adaptiveRebalance}} call, where {{N % 10 == 1}}
** iterate 20 channels
** discard 10 worst channels, use 1st best channel
* For {{N}}'th {{adaptiveRebalance}} call, where {{N % 10 != 1}}
** use remaining 9 best channels from last iteration

But I doubt it's really needed. Even for {{maxTraverseSize==5}}, we can always 
completely skip up to 4 slow channels if they are completely bottlenecked and 
bundled together. If we have more slow channels bundled together, those 
channels would be only getting {{20%}} load compared to the fast channels. So 
this solution would be still working perfectly fine, if "slow" machine/machines 
are 5x slower compared to normal machines.

I'm also looking forward to the FLIP. If I haven't made a mistake in the 
previous paragraph I think it would be good to add it to the FLIP as a 
justification why we don't need large {{maxTraverseSize}}.

Btw, before publishing a FLIP, could you run 
{{mapRebalanceMapSink.F27_UNBOUNDED}} benchmark from 
[flink-benchmarks|https://github.com/apache/flink-benchmarks] (you can check 
readme and especially [the general 
remarks|https://github.com/apache/flink-benchmarks#general-remarks]) and 
compare normal rebalance with your adaptive one? This benchmark is defined in 
{{src/main/java/org/apache/flink/benchmark/InputBenchmark.java}} file.



> Adaptive Channel selection for partitioner
> ------------------------------------------
>
>                 Key: FLINK-31655
>                 URL: https://issues.apache.org/jira/browse/FLINK-31655
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: tartarus
>            Assignee: tartarus
>            Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the 
> same, then by default the RebalancePartitioner will be used to select the 
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc 
> services, If some of the Operators are slow to return requests (for external 
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel 
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is 
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing 
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput 
> of job!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to