[ 
https://issues.apache.org/jira/browse/FLINK-22887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17369499#comment-17369499
 ] 

Piotr Nowojski commented on FLINK-22887:
----------------------------------------

Thanks for the answers.

Regarding the benchmarks, it should be easy to add such coverage to our 
existing benchmarks (in 
{{org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkThroughputBenchmark#setUp(int,
 int, int, boolean, boolean, int, int, 
org.apache.flink.configuration.Configuration)}} just pick a different 
{{ChannelSelector}} via {{RecordWriterBuilder#setChannelSelector}}).

What does happen in your proposal [~wind_ljy] if all partitions are exceeding 
{{backlogCount > 2}}? Are you looping through all of them? Are you blocking or 
select one with a smallest backlog? I'm also not sure what would be a behaviour 
of a solution that is using unsynchronised accesses. I would be afraid of 
something like that. In all of our tests it might be working perfectly, but 
then for suddenly it can stop working at all, maybe on some untested by us 
environments.

[~akalashnikov], I took a brief look at your proposal and kudos for the 
{{writingThreadTotalNumberOfSentBytes}} hack. It reminds me of the 
{{BufferConsumer.CachedPositionMarker#cachedPosition}}. As I understand it, 
it's fully accurate, without any additional synchronisation overhead, but stats 
are updated with a delay of two buffers (the number of sent bytes is reaching 
the {{LoadRebalancePartitioner
]] only after sending one buffer AND then we also need to wait for another 
buffered to be enqueued in a sub-partition? But what's the essence of your 
{{chooseLessLoadedSubPartition}}? You are trying to keep first element of the 
{{subpartitionStatistics}} to be the least loaded subpartition. And once per 
every record, you are checking in a round robin fashion a single other channel 
if it's the new minimum? I think I like this better than my idea with 
{{AddCredit}}. It probably should work well enough, with zero synchronisation 
overheads, while still being thread safe. About the implementation, I'm not 
sure, but maybe it would be better to implement a specialised 
{{LoadRebalanceRecordWriter}} instead of exposing 
{{getSubpartitionsStatistics}} and adding {{optionalSetup}}..

> Backlog based optimizations for RebalancePartitioner and RescalePartitioner
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-22887
>                 URL: https://issues.apache.org/jira/browse/FLINK-22887
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.13.1
>            Reporter: Jiayi Liao
>            Priority: Major
>
> {\{RebalancePartitioner}} uses round-robin to distribute the records but this 
> may not work as expected, because the environments and the processing ability 
> of the downstream tasks may differ from each other. In such cases, the 
> throughput of the whole job will be limited by the slowest downstream 
> subtask, which is very similar with the "HASH" scenario.
> Instead, after the credit-based mechanism is introduced, we can leverage the 
> {{backlog}} on the sender side to identify the "load" on each receiver side, 
> which help us distribute the data more fairly in {{RebalancePartitioner}} and 
> {{RescalePartitioner}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to