This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b5aa420180729d8792c5d6feecfeb14382f72ee
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Aug 10 14:49:00 2022 +0800

    [FLINK-28884] Reset needNotify to true when get a zero backlog.
    
    HsSubpartitionView should be notifiable when downstream get a zero backlog. 
Generally speaking, if the backlog is zero, when data become available, even if 
there is no credit, the backlog information will be notified also. However, in 
the hybrid shuffle, the notification will be ignored. This behavior is 
incorrect.
    
    This closes #20529
---
 .../network/partition/hybrid/HsSubpartitionView.java   |  7 ++++++-
 .../partition/hybrid/HsSubpartitionViewTest.java       | 18 ++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index e40bcbf6b4f..704d52703a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -118,7 +118,12 @@ public class HsSubpartitionView
                     && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
                 availability = true;
             }
-            return new AvailabilityWithBacklog(availability, 
getSubpartitionBacklog());
+
+            int backlog = getSubpartitionBacklog();
+            if (backlog == 0) {
+                needNotify = true;
+            }
+            return new AvailabilityWithBacklog(availability, backlog);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index 659984f2f0a..b81373ac11e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -271,6 +271,24 @@ class HsSubpartitionViewTest {
         assertThat(notifyAvailableFuture).isNotCompleted();
     }
 
+    @Test
+    void testGetZeroBacklogNeedNotify() {
+        CompletableFuture<Void> notifyAvailableFuture = new 
CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                createSubpartitionView(() -> 
notifyAvailableFuture.complete(null));
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> 
0).build());
+
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog.getBacklog()).isZero();
+
+        assertThat(notifyAvailableFuture).isNotCompleted();
+        subpartitionView.notifyDataAvailable();
+        assertThat(notifyAvailableFuture).isCompleted();
+    }
+
     @Test
     void testGetAvailabilityAndBacklogPositiveCredit() {
         HsSubpartitionView subpartitionView = createSubpartitionView();

Reply via email to