[
https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cai Liuyang updated FLINK-25664:
--------------------------------
Affects Version/s: 1.14.3
> Notify will be not triggered for PipelinedSubpartition if more than one
> buffer is added during isBlocked == true
> ----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-25664
> URL: https://issues.apache.org/jira/browse/FLINK-25664
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.14.3
> Reporter: Cai Liuyang
> Priority: Major
> Labels: pull-request-available
>
> For now, there might be case like:
> # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked
> == false)
> # CreditBasedSequenceNumberingViewReader pool this buffer and
> PipelinedSubPartition become to Blocked (isBlocked == true)
> # Before downStream resumeConsumption, we add two finished-buffer to this
> PipelinedSubPartition (there is no limit for adding buffer to
> blocked-PipelinedSubPartition)
> ## add the first finished-buffer will not notifyDataAvailable because
> isBlocked == true
> ## add the second finished-buffer will also not notifyDataAvailable because
> of isBlocked == true and finishedBuffer > 1
> # DownStream resumeConsumption, PipelinedSubPartition is unblocked
> (isBlocked == false)
> # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable
> because of finishedBuffer > 1
> In conclusion,There are three case we should trigger notifyDataAvailable:
> case1: only have one finished buffer (handled by add)
> case2: only have one unfinished buffer (handled by flush)
> case3: have more than on finished buffer, which is add during
> PipelinedSubPartition is blocked (not handled)
> {code:java}
> // test code for this case
> // add this test case to
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
>
> @Test
> public void
> testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
> throws Exception {
> blockSubpartitionByCheckpoint(1);
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> assertEquals(1, availablityListener.getNumNotifications());
> readView.resumeConsumption();
> subpartition.flush();
> assertEquals(2, availablityListener.getNumNotifications());
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)