This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71c9760322ea8002b7e1af8d93c604287bd098ca
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Dec 13 11:00:45 2023 +0800

    [FLINK-34225][runtime] Fix the race condition in UnionResultSubpartitionView
---
 .../partition/UnionResultSubpartitionView.java     | 74 +++++++++++++++++-----
 .../partition/UnionResultSubpartitionViewTest.java | 20 ++++++
 2 files changed, 77 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
index cede2da3219..eeb28fd0888 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
@@ -28,10 +28,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.Set;
 
 /**
  * A wrapper to union the output from multiple {@link 
ResultSubpartitionView}s. This class provides
@@ -55,9 +58,11 @@ public class UnionResultSubpartitionView
     private final Object lock = new Object();
 
     /** All the {@link ResultSubpartitionView}s managed by this class. */
+    @GuardedBy("lock")
     private final BiMap<Integer, ResultSubpartitionView> allViews = 
HashBiMap.create();
 
     /** All the {@link ResultSubpartitionView}s that have data available. */
+    @GuardedBy("lock")
     private final SubpartitionSelector<ResultSubpartitionView> availableViews =
             new RoundRobinSubpartitionSelector<>();
 
@@ -68,11 +73,23 @@ public class UnionResultSubpartitionView
      * where each buffer comes from. Cache is used to provide the data type of 
the next buffer and
      * an estimation of the backlog, as required by {@link 
ResultSubpartition.BufferAndBacklog}.
      */
+    @GuardedBy("lock")
     private final Queue<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>> 
cachedBuffers =
             new LinkedList<>();
 
+    /**
+     * A collection storing views that have triggered {@link
+     * #notifyDataAvailable(ResultSubpartitionView)} without {@link 
#notifyViewCreated(int,
+     * ResultSubpartitionView)}. This is used to resolve the race condition 
between these two
+     * methods.
+     */
+    @GuardedBy("lock")
+    private final Set<ResultSubpartitionView> unregisteredAvailableViews = new 
HashSet<>();
+
+    @GuardedBy("lock")
     private boolean isReleased;
 
+    @GuardedBy("lock")
     private int sequenceNumber;
 
     public UnionResultSubpartitionView(BufferAvailabilityListener 
availabilityListener) {
@@ -82,7 +99,12 @@ public class UnionResultSubpartitionView
     }
 
     public void notifyViewCreated(int subpartitionId, ResultSubpartitionView 
view) {
-        allViews.put(subpartitionId, view);
+        synchronized (lock) {
+            allViews.put(subpartitionId, view);
+            if (unregisteredAvailableViews.remove(view)) {
+                notifyDataAvailable(view);
+            }
+        }
     }
 
     @Override
@@ -148,6 +170,11 @@ public class UnionResultSubpartitionView
     @Override
     public void notifyDataAvailable(ResultSubpartitionView view) {
         synchronized (lock) {
+            if (!allViews.containsValue(view)) {
+                unregisteredAvailableViews.add(view);
+                return;
+            }
+
             if (!availableViews.notifyDataAvailable(view) || 
!cachedBuffers.isEmpty()) {
                 // The availabilityListener has already been notified.
                 return;
@@ -174,22 +201,31 @@ public class UnionResultSubpartitionView
 
     @Override
     public void releaseAllResources() throws IOException {
-        for (ResultSubpartitionView view : allViews.values()) {
-            view.releaseAllResources();
-        }
-        allViews.clear();
+        synchronized (lock) {
+            for (ResultSubpartitionView view : allViews.values()) {
+                view.releaseAllResources();
+            }
+            allViews.clear();
 
-        for (Tuple2<ResultSubpartition.BufferAndBacklog, Integer> tuple2 : 
cachedBuffers) {
-            tuple2.f0.buffer().recycleBuffer();
-        }
-        cachedBuffers.clear();
+            for (ResultSubpartitionView view : unregisteredAvailableViews) {
+                view.releaseAllResources();
+            }
+            unregisteredAvailableViews.clear();
+
+            for (Tuple2<ResultSubpartition.BufferAndBacklog, Integer> tuple2 : 
cachedBuffers) {
+                tuple2.f0.buffer().recycleBuffer();
+            }
+            cachedBuffers.clear();
 
-        isReleased = true;
+            isReleased = true;
+        }
     }
 
     @Override
     public boolean isReleased() {
-        return isReleased;
+        synchronized (lock) {
+            return isReleased;
+        }
     }
 
     @Override
@@ -207,10 +243,12 @@ public class UnionResultSubpartitionView
     @Override
     public Throwable getFailureCause() {
         Throwable cause = null;
-        for (ResultSubpartitionView view : allViews.values()) {
-            if (view.getFailureCause() != null) {
-                cause = view.getFailureCause();
-                LOG.error(cause.toString());
+        synchronized (lock) {
+            for (ResultSubpartitionView view : allViews.values()) {
+                if (view.getFailureCause() != null) {
+                    cause = view.getFailureCause();
+                    LOG.error(cause.toString());
+                }
             }
         }
         return cause;
@@ -258,8 +296,10 @@ public class UnionResultSubpartitionView
 
     @Override
     public void notifyNewBufferSize(int newBufferSize) {
-        for (ResultSubpartitionView view : allViews.values()) {
-            view.notifyNewBufferSize(newBufferSize);
+        synchronized (lock) {
+            for (ResultSubpartitionView view : allViews.values()) {
+                view.notifyNewBufferSize(newBufferSize);
+            }
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
index 9649dd6d501..ef9d430b663 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
@@ -138,6 +138,26 @@ class UnionResultSubpartitionViewTest {
         assertThat(buffers1).allMatch(Buffer::isRecycled);
     }
 
+    @Test
+    public void testDataAvailableBeforeRegistration() {
+        view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> 
{});
+        view0 = new TestingResultSubpartitionView(view, buffers0);
+
+        view0.notifyDataAvailable();
+
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 
=
+                view.getAvailabilityAndBacklog(true);
+        assertThat(availabilityAndBacklog1.getBacklog()).isZero();
+        assertThat(availabilityAndBacklog1.isAvailable()).isFalse();
+
+        view.notifyViewCreated(0, view0);
+
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 
=
+                view.getAvailabilityAndBacklog(true);
+        assertThat(availabilityAndBacklog2.getBacklog()).isPositive();
+        assertThat(availabilityAndBacklog2.isAvailable()).isTrue();
+    }
+
     private static class TestingResultSubpartitionView extends 
NoOpResultSubpartitionView {
         private final BufferAvailabilityListener listener;
         private final List<Buffer> buffers;

Reply via email to