[
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711950#comment-17711950
]
Anton Kalashnikov commented on FLINK-31655:
-------------------------------------------
??I wonder why the implementation in the pull request could impact
significantly??
It wasn't significant impact. As I remember there were two sensitive
microbenchamrks which showed a drop of about 2-3%. Maybe we even could accept
it but we haven't discussed it properly
??I looked at your implementation of LoadBasedRecordWriter If I havent missed
any details SubpartitionStatistic is only updated at the time of emit this
statistic does not represent the true computing power of downstream operators??
Yes, you are absolutely right that it is updated only at the time of emit which
is not so perfect. But it was my small POC I just tried to show that it is
possible to do without extra locks.
The logic in my current PR:
* During the emit we take `currentChannel` and `nextChannel`(currentChannel + 1)
* If `bytesInQueue(currentChannel)` > `bytesInQueue(nextChannel)` then we set
`nextChannel` as `currentChannel` otherwise leave `currentChannel` as is
As already was noticed, the current solution can be stuck on one channel
forever since it never updates the bytesInQueue for other channels but current.
The improved logic can look like this:
* We have `roundRobinChannel`(the channel which increased by 1 on every emit)
* During the emit we increase `roundRobinChannel` by 1
* We update `bytesInQueue` for `roundRobinChannel`
* Now, If `bytesInQueue(currentChannel)` > `bytesInQueue(roundRobinChannel)`
then we set `roundRobinChannel` as `currentChannel` otherwise leave
`currentChannel` as is
So if we have N channels then each channel will be updated with each N-th
record which is accurate enough for our goal I think. But it is not clear yet
how to update `roundRobinChannel` cheaply.
??Solution 1??
I still wonder about the implementation of getting buffer queue sizes since it
is the tricky part. I solved this problem somehow in my PR but anyway, it still
questions there.
As well as [~pnowojski], I also have concerns about the efficiency of this
solution since it is O(ChannelsCount) that doesn't look optimal. But we can
just optimize that algorithm to something like I describe before or similar.
??Solution 2??
I doubt that we will be ready for extra synchronization on the hot path
especially in common code for all partitioners. But of course, we can discuss
it anyway but only with more specific implementation ideas.
In conclusion, let's proceed with FLIP. And we can think:
* How to effectively get channel queue size
* How to effectively calculate the next channel
> Adaptive Channel selection for partitioner
> ------------------------------------------
>
> Key: FLINK-31655
> URL: https://issues.apache.org/jira/browse/FLINK-31655
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: tartarus
> Assignee: tartarus
> Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the
> same, then by default the RebalancePartitioner will be used to select the
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc
> services, If some of the Operators are slow to return requests (for external
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput
> of job!
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)