[
https://issues.apache.org/jira/browse/FLINK-22887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17369499#comment-17369499
]
Piotr Nowojski commented on FLINK-22887:
----------------------------------------
Thanks for the answers.
Regarding the benchmarks, it should be easy to add such coverage to our
existing benchmarks (in
{{org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkThroughputBenchmark#setUp(int,
int, int, boolean, boolean, int, int,
org.apache.flink.configuration.Configuration)}} just pick a different
{{ChannelSelector}} via {{RecordWriterBuilder#setChannelSelector}}).
What does happen in your proposal [~wind_ljy] if all partitions are exceeding
{{backlogCount > 2}}? Are you looping through all of them? Are you blocking or
select one with a smallest backlog? I'm also not sure what would be a behaviour
of a solution that is using unsynchronised accesses. I would be afraid of
something like that. In all of our tests it might be working perfectly, but
then for suddenly it can stop working at all, maybe on some untested by us
environments.
[~akalashnikov], I took a brief look at your proposal and kudos for the
{{writingThreadTotalNumberOfSentBytes}} hack. It reminds me of the
{{BufferConsumer.CachedPositionMarker#cachedPosition}}. As I understand it,
it's fully accurate, without any additional synchronisation overhead, but stats
are updated with a delay of two buffers (the number of sent bytes is reaching
the {{LoadRebalancePartitioner
]] only after sending one buffer AND then we also need to wait for another
buffered to be enqueued in a sub-partition? But what's the essence of your
{{chooseLessLoadedSubPartition}}? You are trying to keep first element of the
{{subpartitionStatistics}} to be the least loaded subpartition. And once per
every record, you are checking in a round robin fashion a single other channel
if it's the new minimum? I think I like this better than my idea with
{{AddCredit}}. It probably should work well enough, with zero synchronisation
overheads, while still being thread safe. About the implementation, I'm not
sure, but maybe it would be better to implement a specialised
{{LoadRebalanceRecordWriter}} instead of exposing
{{getSubpartitionsStatistics}} and adding {{optionalSetup}}..
> Backlog based optimizations for RebalancePartitioner and RescalePartitioner
> ---------------------------------------------------------------------------
>
> Key: FLINK-22887
> URL: https://issues.apache.org/jira/browse/FLINK-22887
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Network
> Affects Versions: 1.13.1
> Reporter: Jiayi Liao
> Priority: Major
>
> {\{RebalancePartitioner}} uses round-robin to distribute the records but this
> may not work as expected, because the environments and the processing ability
> of the downstream tasks may differ from each other. In such cases, the
> throughput of the whole job will be limited by the slowest downstream
> subtask, which is very similar with the "HASH" scenario.
> Instead, after the credit-based mechanism is introduced, we can leverage the
> {{backlog}} on the sender side to identify the "load" on each receiver side,
> which help us distribute the data more fairly in {{RebalancePartitioner}} and
> {{RescalePartitioner}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)