[ 
https://issues.apache.org/jira/browse/FLINK-22887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371309#comment-17371309
 ] 

Anton Kalashnikov commented on FLINK-22887:
-------------------------------------------

[~pnowojski] , my answers bellow:

??stats are updated with a delay of two buffers??

Yes, you described it right. NumberOfReceivedBytes is updated in real-time 
because it is cheap but NumberOfSentBytes is updated only when we put the next 
new Buffer to the sub-partition. So at this moment, we take the snapshot of 
NumberOfSentBytes and until the next put, this value won't be changed. I would 
say that after the put of the Buffer, the  'LoadRebalancePartitioner' sees the 
actual value and it sees the most irrelevant value when the current Buffer is 
almost full. So I can say that difference only one Buffer rather than two.

??But what's the essence of your {{chooseLessLoadedSubPartition}}? You are 
trying to keep first element of the {{subpartitionStatistics}} to be the least 
loaded subpartition. And once per every record, you are checking in a round 
robin fashion a single other channel if it's the new minimum???

Yes, everything as you described. The zero element is the least loaded 
subpartition and the algorithm compares only one new subpartition with that to 
keep the complexity constant. As you understand, it is not perfect because it 
just chooses the local least loaded subpartition but my experiments show that 
it works pretty good.

 

a specialized {{LoadRebalanceRecordWriter may be a good idea. In fact, the part 
of collecting the statistic and passing this information to 
LoadRebalancePartitioner are not designed properly. It is exactly what I want 
to think of in more detail.}}

> Backlog based optimizations for RebalancePartitioner and RescalePartitioner
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-22887
>                 URL: https://issues.apache.org/jira/browse/FLINK-22887
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.13.1
>            Reporter: Jiayi Liao
>            Priority: Major
>
> {\{RebalancePartitioner}} uses round-robin to distribute the records but this 
> may not work as expected, because the environments and the processing ability 
> of the downstream tasks may differ from each other. In such cases, the 
> throughput of the whole job will be limited by the slowest downstream 
> subtask, which is very similar with the "HASH" scenario.
> Instead, after the credit-based mechanism is introduced, we can leverage the 
> {{backlog}} on the sender side to identify the "load" on each receiver side, 
> which help us distribute the data more fairly in {{RebalancePartitioner}} and 
> {{RescalePartitioner}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to