This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0b5aa420180729d8792c5d6feecfeb14382f72ee Author: Weijie Guo <[email protected]> AuthorDate: Wed Aug 10 14:49:00 2022 +0800 [FLINK-28884] Reset needNotify to true when get a zero backlog. HsSubpartitionView should be notifiable when downstream get a zero backlog. Generally speaking, if the backlog is zero, when data become available, even if there is no credit, the backlog information will be notified also. However, in the hybrid shuffle, the notification will be ignored. This behavior is incorrect. This closes #20529 --- .../network/partition/hybrid/HsSubpartitionView.java | 7 ++++++- .../partition/hybrid/HsSubpartitionViewTest.java | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java index e40bcbf6b4f..704d52703a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java @@ -118,7 +118,12 @@ public class HsSubpartitionView && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) { availability = true; } - return new AvailabilityWithBacklog(availability, getSubpartitionBacklog()); + + int backlog = getSubpartitionBacklog(); + if (backlog == 0) { + needNotify = true; + } + return new AvailabilityWithBacklog(availability, backlog); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java index 659984f2f0a..b81373ac11e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java @@ -271,6 +271,24 @@ class HsSubpartitionViewTest { assertThat(notifyAvailableFuture).isNotCompleted(); } + @Test + void testGetZeroBacklogNeedNotify() { + CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>(); + HsSubpartitionView subpartitionView = + createSubpartitionView(() -> notifyAvailableFuture.complete(null)); + subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); + subpartitionView.setDiskDataView( + TestingHsDataView.builder().setGetBacklogSupplier(() -> 0).build()); + + AvailabilityWithBacklog availabilityAndBacklog = + subpartitionView.getAvailabilityAndBacklog(0); + assertThat(availabilityAndBacklog.getBacklog()).isZero(); + + assertThat(notifyAvailableFuture).isNotCompleted(); + subpartitionView.notifyDataAvailable(); + assertThat(notifyAvailableFuture).isCompleted(); + } + @Test void testGetAvailabilityAndBacklogPositiveCredit() { HsSubpartitionView subpartitionView = createSubpartitionView();
