This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 71c9760322ea8002b7e1af8d93c604287bd098ca Author: yunfengzhou-hub <[email protected]> AuthorDate: Wed Dec 13 11:00:45 2023 +0800 [FLINK-34225][runtime] Fix the race condition in UnionResultSubpartitionView --- .../partition/UnionResultSubpartitionView.java | 74 +++++++++++++++++----- .../partition/UnionResultSubpartitionViewTest.java | 20 ++++++ 2 files changed, 77 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java index cede2da3219..eeb28fd0888 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java @@ -28,10 +28,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.util.HashSet; import java.util.LinkedList; import java.util.Queue; +import java.util.Set; /** * A wrapper to union the output from multiple {@link ResultSubpartitionView}s. This class provides @@ -55,9 +58,11 @@ public class UnionResultSubpartitionView private final Object lock = new Object(); /** All the {@link ResultSubpartitionView}s managed by this class. */ + @GuardedBy("lock") private final BiMap<Integer, ResultSubpartitionView> allViews = HashBiMap.create(); /** All the {@link ResultSubpartitionView}s that have data available. */ + @GuardedBy("lock") private final SubpartitionSelector<ResultSubpartitionView> availableViews = new RoundRobinSubpartitionSelector<>(); @@ -68,11 +73,23 @@ public class UnionResultSubpartitionView * where each buffer comes from. Cache is used to provide the data type of the next buffer and * an estimation of the backlog, as required by {@link ResultSubpartition.BufferAndBacklog}. */ + @GuardedBy("lock") private final Queue<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>> cachedBuffers = new LinkedList<>(); + /** + * A collection storing views that have triggered {@link + * #notifyDataAvailable(ResultSubpartitionView)} without {@link #notifyViewCreated(int, + * ResultSubpartitionView)}. This is used to resolve the race condition between these two + * methods. + */ + @GuardedBy("lock") + private final Set<ResultSubpartitionView> unregisteredAvailableViews = new HashSet<>(); + + @GuardedBy("lock") private boolean isReleased; + @GuardedBy("lock") private int sequenceNumber; public UnionResultSubpartitionView(BufferAvailabilityListener availabilityListener) { @@ -82,7 +99,12 @@ public class UnionResultSubpartitionView } public void notifyViewCreated(int subpartitionId, ResultSubpartitionView view) { - allViews.put(subpartitionId, view); + synchronized (lock) { + allViews.put(subpartitionId, view); + if (unregisteredAvailableViews.remove(view)) { + notifyDataAvailable(view); + } + } } @Override @@ -148,6 +170,11 @@ public class UnionResultSubpartitionView @Override public void notifyDataAvailable(ResultSubpartitionView view) { synchronized (lock) { + if (!allViews.containsValue(view)) { + unregisteredAvailableViews.add(view); + return; + } + if (!availableViews.notifyDataAvailable(view) || !cachedBuffers.isEmpty()) { // The availabilityListener has already been notified. return; @@ -174,22 +201,31 @@ public class UnionResultSubpartitionView @Override public void releaseAllResources() throws IOException { - for (ResultSubpartitionView view : allViews.values()) { - view.releaseAllResources(); - } - allViews.clear(); + synchronized (lock) { + for (ResultSubpartitionView view : allViews.values()) { + view.releaseAllResources(); + } + allViews.clear(); - for (Tuple2<ResultSubpartition.BufferAndBacklog, Integer> tuple2 : cachedBuffers) { - tuple2.f0.buffer().recycleBuffer(); - } - cachedBuffers.clear(); + for (ResultSubpartitionView view : unregisteredAvailableViews) { + view.releaseAllResources(); + } + unregisteredAvailableViews.clear(); + + for (Tuple2<ResultSubpartition.BufferAndBacklog, Integer> tuple2 : cachedBuffers) { + tuple2.f0.buffer().recycleBuffer(); + } + cachedBuffers.clear(); - isReleased = true; + isReleased = true; + } } @Override public boolean isReleased() { - return isReleased; + synchronized (lock) { + return isReleased; + } } @Override @@ -207,10 +243,12 @@ public class UnionResultSubpartitionView @Override public Throwable getFailureCause() { Throwable cause = null; - for (ResultSubpartitionView view : allViews.values()) { - if (view.getFailureCause() != null) { - cause = view.getFailureCause(); - LOG.error(cause.toString()); + synchronized (lock) { + for (ResultSubpartitionView view : allViews.values()) { + if (view.getFailureCause() != null) { + cause = view.getFailureCause(); + LOG.error(cause.toString()); + } } } return cause; @@ -258,8 +296,10 @@ public class UnionResultSubpartitionView @Override public void notifyNewBufferSize(int newBufferSize) { - for (ResultSubpartitionView view : allViews.values()) { - view.notifyNewBufferSize(newBufferSize); + synchronized (lock) { + for (ResultSubpartitionView view : allViews.values()) { + view.notifyNewBufferSize(newBufferSize); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java index 9649dd6d501..ef9d430b663 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java @@ -138,6 +138,26 @@ class UnionResultSubpartitionViewTest { assertThat(buffers1).allMatch(Buffer::isRecycled); } + @Test + public void testDataAvailableBeforeRegistration() { + view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> {}); + view0 = new TestingResultSubpartitionView(view, buffers0); + + view0.notifyDataAvailable(); + + ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 = + view.getAvailabilityAndBacklog(true); + assertThat(availabilityAndBacklog1.getBacklog()).isZero(); + assertThat(availabilityAndBacklog1.isAvailable()).isFalse(); + + view.notifyViewCreated(0, view0); + + ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 = + view.getAvailabilityAndBacklog(true); + assertThat(availabilityAndBacklog2.getBacklog()).isPositive(); + assertThat(availabilityAndBacklog2.isAvailable()).isTrue(); + } + private static class TestingResultSubpartitionView extends NoOpResultSubpartitionView { private final BufferAvailabilityListener listener; private final List<Buffer> buffers;
