[
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733425#comment-17733425
]
tartarus commented on FLINK-31655:
----------------------------------
[~fanrui] Thank you very much for sharing your solution!
Internally, I also optimized solution 1, not to find the global optimal
channel, but to find the optimal one in most of the channels.
I did a performance test for the channel select policy and collected
information on the time taken for 10,000 calls, like follows:
*Time consumption in nanoseconds:*
||subpartition
number||{color:#de350b}10(avg){color}||10(P95)||10(P99)||{color:#de350b}100(avg){color}||100(P95)||100(P99)||{color:#de350b}200(avg){color}||200(P95)||200(P99)||{color:#de350b}300(avg){color}||300(P95)||300(P99)||{color:#de350b}500(avg){color}||500(P95)||500(P99)||{color:#de350b}800(avg){color}||800(P95)||800(P99)||{color:#de350b}1000(avg){color}||1000(P95)||1000(P99)||{color:#de350b}2000(avg){color}||2000(P95)||2000(P99)||{color:#de350b}3000(avg){color}||3000(P95)||3000(P99)||{color:#de350b}5000(avg){color}||5000(P95)||5000(P99)||{color:#de350b}8000(avg){color}||8000(P95)||8000(P99)||{color:#de350b}10000(avg){color}||10000(P95)||10000(P99)||
|Rebalance|{color:#de350b}68{color}|90|102|{color:#de350b}66{color}|94|116|{color:#de350b}69{color}|86|96|{color:#de350b}68{color}|104|199|{color:#de350b}68{color}|89|99|{color:#de350b}74{color}|118|131|{color:#de350b}67{color}|88|99|{color:#de350b}67{color}|90|104|{color:#de350b}58{color}|93|110|{color:#de350b}70{color}|89|95|{color:#de350b}61{color}|81|92|{color:#de350b}63{color}|86|99|
|solution 1(Global
Optimal)|{color:#de350b}203{color}|372|1411|{color:#de350b}836{color}|1285|3488|{color:#de350b}1152{color}|2419|6547|{color:#de350b}1486{color}|3672|9346|{color:#de350b}1926{color}|5105|12107|{color:#de350b}2899{color}|9285|21338|{color:#de350b}4569{color}|13939|25574|{color:#de350b}8023{color}|7929|24980|{color:#de350b}11701{color}|12371|34079|{color:#de350b}19364{color}|19910|53594|{color:#de350b}32792{color}|42199|104582|{color:#de350b}37397{color}|39548|73330|
|optimized solution 1(Most of the
best)|{color:#de350b}214{color}|360|1652|{color:#de350b}1223{color}|1447|4509|{color:#de350b}1508{color}|2613|5816|{color:#de350b}1808{color}|3815|8756|{color:#de350b}2124{color}|5962|12336|{color:#de350b}3048{color}|9012|21896|{color:#de350b}3277{color}|10599|28300|{color:#de350b}7403{color}|18832|37264|{color:#de350b}10543{color}|22218|43845|{color:#de350b}15944{color}|20266|62732|{color:#de350b}28271{color}|41126|54055|{color:#de350b}36096{color}|52062|82243|
|Traverse 5
channels|{color:#de350b}213{color}|357|1268|{color:#de350b}190{color}|461|984|{color:#de350b}169{color}|290|941|{color:#de350b}239{color}|393|1221|{color:#de350b}220{color}|566|1240|{color:#de350b}213{color}|510|1100|{color:#de350b}209{color}|478|1041|{color:#de350b}230{color}|638|1336|{color:#de350b}190{color}|472|964|{color:#de350b}183{color}|388|945|{color:#de350b}178{color}|516|978|{color:#de350b}224{color}|511|1148|
|Traverse 10
channels|{color:#de350b}226{color}|440|1776|{color:#de350b}305{color}|536|1601|{color:#de350b}227{color}|477|1785|{color:#de350b}297{color}|520|1564|{color:#de350b}243{color}|507|1672|{color:#de350b}220{color}|472|1422|{color:#de350b}276{color}|487|1443|{color:#de350b}280{color}|536|2042|{color:#de350b}254{color}|535|1895|{color:#de350b}254{color}|495|1976|{color:#de350b}249{color}|472|1691|{color:#de350b}293{color}|548|1631|
|Traverse 20
channels|{color:#de350b}244{color}|495|1706|{color:#de350b}298{color}|421|973|{color:#de350b}296{color}|413|961|{color:#de350b}394{color}|725|844|{color:#de350b}279{color}|387|854|{color:#de350b}406{color}|695|845|{color:#de350b}360{color}|499|1144|{color:#de350b}335{color}|478|1112|{color:#de350b}302{color}|495|960|{color:#de350b}392{color}|703|844|{color:#de350b}400{color}|688|866|{color:#de350b}343{color}|681|1030|
The above table shows that the time complexity of Rebalance is O(1), while our
optimization is O(n), which is consistent with expectations.
`maxTraverseSize` solution, which can be selected by the user according to the
actual situation to make the best choice!
I suggest we use a combination of option and api to set `maxTraverseSize`,
If maxTraverseSize is set through api, we can override the value of option; if
not set through api, then use the value of option, because api can do the
granularity of each adaptiveRebalance, while option is flink job granularity;
but in SQL job the user can only set through option;
The public api can be defined as follows:
{code:java}
public DataStream<T> adaptiveRebalance(int maxTraverseSize) {code}
[~pnowojski] [~akalash] [~pltbkd] what do you think?
Looking forward to your feedback and suggestion, thanks.
> Adaptive Channel selection for partitioner
> ------------------------------------------
>
> Key: FLINK-31655
> URL: https://issues.apache.org/jira/browse/FLINK-31655
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: tartarus
> Assignee: tartarus
> Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the
> same, then by default the RebalancePartitioner will be used to select the
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc
> services, If some of the Operators are slow to return requests (for external
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput
> of job!
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)