[
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739998#comment-16739998
]
zhijiang commented on FLINK-11082:
----------------------------------
[~pnowojski], thanks for replies!
It might not make sense to increase backlog when adding new buffer based on
whether current queue is empty or not. It has two concerns:
1.If there exists only event buffers in queue, for first adding data buffer the
backlog should not be increased. So it might need another boolean flag to
indicate whether the added data buffer is the first one or not.
2.Take another case not considering flusher and event factors.
* Add the first buffer to queue by writer, and the current backlog should be 0.
* Add the second buffer to increase backlog 1, and trigger notify data
available for transporting.
* The first buffer is popped from queue to decrease backlog to 0 by netty
thread. Note the second buffers might already be finished by writer, but the
third buffer has not been added into queue yet. Then the sub partition is still
available for transporting and backlog is 0 currently.
* The second buffer is popped from queue to decrease backlog to -1 by netty
thread. After that the third buffer is added into queue to increase the backlog
to 0. There leaves only one unfinished buffer in queue. So the value of backlog
might be -1 sometimes.
When one buffer is finished by writer, the backlog should be increased in
theory. But the increase is triggered delay by adding the next buffer. Before
increasing backlog, the netty thread can see the finished buffer and decrease
backlog in advance. This causes the above problem.
If considering flusher factor, the conditions seem more complicated. I also
considered some other ways before. One way for solving above issue is
introducing another {{notifyBufferBuilderFinished}} method in
{{ResultPartitionWriter}} interface. In {{RecoredWriter}}, it should first call
{{notifyBufferBuilderFinished}} before calling {{BufferBuilder#finish()}}, and
the backlog is increased in {{notifyBufferBuilderFinished}} implementation. In
other words, the backlog should alway be increased first, and then decreased.
Further considering flusher factor in my above proposal:
* If the flush triggered, whether to increase backlog based on the last buffer
is finished or not. If there are 2 buffers in queue and the second buffer is
already finished, the backlog is still 2 when flush triggered. If the second
buffer is not finished, the backlog increases from 1 to 2 by flush triggered.
* When {{notifyBufferBuilderFinished}}, if {{flushRequested}} is true, then
the backlog should not increase because the flush already help do that before,
otherwise increase the backlog. E.g. when the second buffer is finished and
flush already triggered before, the backlog should still be 2.
* When adding new buffer into queue, if {{flushRequested}} is true, then the
backlog should increase to reflect flush work. E.g. when the third buffer is
added and {{flushRequested}} is true, the current backlog should increase from
2 to 3.
* When peeking the last buffer in queue, no matter the buffer is finished or
not, the backlog would be 0 and {{flushRequested}} becomes false.
It makes sense for {{PipelinedSubpartition}}. But it adds another separate
synchronization for increasing backlog, and the current way is already in
synchronization during adding buffer. For {{SpillableSubpartition}} I would
think it a bit through.
> Increase backlog only if it is available for consumption
> --------------------------------------------------------
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition
> for downstream's consumption. The availability is considered from two
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the
> subpartition, then the backlog is increased as a result, but this
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on
> downstream side. That means some floating buffers are fetched in advance but
> not be used for long time, so the floating buffers are not made use of
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so
> we want to change when to increase backlog by finishing {{BufferConsumer}} or
> flush triggered.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)