[
https://issues.apache.org/jira/browse/FLINK-22887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368787#comment-17368787
]
Anton Kalashnikov commented on FLINK-22887:
-------------------------------------------
The ideas which I thought about:
* Using the unsafe 'getBuffersInBacklog'(the same idea which [~wind_ljy]
described) - it is cheap but too dangerous and honestly, I think it is not for
the production because it will be working unpredictably depends on processors,
jvm implementation/version, the load, and etc. So it is good for the POC and
maybe it makes sense to implement it in order to compare with the final
solution but I don't think that it should be only one implementation.
* The opposite solution - to update statistics of received and sent records on
record level. It is expensive because it requires synchronization on each
record read/write which slows down the writing which is not good.
* Using the existing synchronization for collecting statistics when the buffer
is added to the queue. it is cheap but it is not so accurate which in my
opinion is not a big problem(this solution I implemented in my branch).
* Using the existing synchronization and something like a priority queue to
collect the statistics in a sorted way(the same idea which [~pnowojski]
described only I didn't think about credit ). It is not so cheap to compare
with the previous one. But It is more accurate on buffer level but for absolute
accuracy, it should be recalculated on each record which is expensive and I
think, doesn't make sense. This solution also makes sense for implementation
but it also should be compared with a more cheap solution for understanding the
price.
So in my opinion, first of all, we should choose or create a benchmark which
can help to check our ideas. Then we can roughly implement a couple of them
which actually pretty easy. And then we can decide which implementation has
the best tradeoff.
Benchmarks should answer the following questions:
* Throughput difference between current RebalancePartitioner and
RebalancePartitioner in the new code
* Throughput difference between current RebalancePartitioner and
NewPartitioner when subtasks have equals performance.
* Throughput difference between current RebalancePartitioner and
NewPartitioner when subtasks have different performance.
[~wind_ljy], Are you working on this task already or you don't have a plan to
implement it right now? I mean I will be happy to propose myself to this ticket
but if you have been working on this task for a while so it is better if you
continue to do it.
> Backlog based optimizations for RebalancePartitioner and RescalePartitioner
> ---------------------------------------------------------------------------
>
> Key: FLINK-22887
> URL: https://issues.apache.org/jira/browse/FLINK-22887
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Network
> Affects Versions: 1.13.1
> Reporter: Jiayi Liao
> Priority: Major
>
> {\{RebalancePartitioner}} uses round-robin to distribute the records but this
> may not work as expected, because the environments and the processing ability
> of the downstream tasks may differ from each other. In such cases, the
> throughput of the whole job will be limited by the slowest downstream
> subtask, which is very similar with the "HASH" scenario.
> Instead, after the credit-based mechanism is introduced, we can leverage the
> {{backlog}} on the sender side to identify the "load" on each receiver side,
> which help us distribute the data more fairly in {{RebalancePartitioner}} and
> {{RescalePartitioner}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)