[ 
https://issues.apache.org/jira/browse/FLINK-22887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368787#comment-17368787
 ] 

Anton Kalashnikov commented on FLINK-22887:
-------------------------------------------

The ideas which I thought about:
 * Using the unsafe 'getBuffersInBacklog'(the same idea which [~wind_ljy] 
described) -  it is cheap but too dangerous and honestly, I think it is not for 
the production because it will be working unpredictably depends on processors, 
jvm implementation/version, the load, and etc. So it is good for the POC and 
maybe it makes sense to implement it in order to compare with the final 
solution but I don't think that it should be only one implementation.
 * The opposite solution - to update statistics of received and sent records on 
record level. It is expensive because it requires synchronization on each 
record read/write which slows down the writing which is not good.
 * Using the existing synchronization for collecting statistics when the buffer 
is added to the queue. it is cheap but it is not so accurate which in my 
opinion is not a big problem(this solution I implemented in my branch). 
 * Using the existing synchronization and something like a priority queue to 
collect the statistics in a sorted way(the same idea which [~pnowojski] 
described only I didn't think about credit ). It is not so cheap to compare 
with the previous one. But It is more accurate on buffer level but for absolute 
accuracy, it should be recalculated on each record which is expensive and I 
think, doesn't make sense. This solution also makes sense for implementation 
but it also should be compared with a more cheap solution for understanding the 
price.


So in my opinion, first of all, we should choose or create a benchmark which 
can help to check our ideas. Then we can roughly implement a couple of them 
which actually pretty easy.  And then we can decide which implementation has 
the best tradeoff.

 

Benchmarks should answer the following questions:
 * Throughput difference between current RebalancePartitioner and 
RebalancePartitioner in the new code
 * Throughput difference between current RebalancePartitioner and 
NewPartitioner when subtasks have equals performance.
 * Throughput difference between current RebalancePartitioner and 
NewPartitioner when subtasks have different performance.

 

[~wind_ljy], Are you working on this task already or you don't have a plan to 
implement it right now? I mean I will be happy to propose myself to this ticket 
but if you have been working on this task for a while so it is better if you 
continue to do it.

> Backlog based optimizations for RebalancePartitioner and RescalePartitioner
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-22887
>                 URL: https://issues.apache.org/jira/browse/FLINK-22887
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.13.1
>            Reporter: Jiayi Liao
>            Priority: Major
>
> {\{RebalancePartitioner}} uses round-robin to distribute the records but this 
> may not work as expected, because the environments and the processing ability 
> of the downstream tasks may differ from each other. In such cases, the 
> throughput of the whole job will be limited by the slowest downstream 
> subtask, which is very similar with the "HASH" scenario.
> Instead, after the credit-based mechanism is introduced, we can leverage the 
> {{backlog}} on the sender side to identify the "load" on each receiver side, 
> which help us distribute the data more fairly in {{RebalancePartitioner}} and 
> {{RescalePartitioner}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to