This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 973190e8ca5b7225f18b5c176726ef8680faffca
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Thu Jan 25 09:39:26 2024 +0800

    [FLINK-34233][runtime] Avoid enqueuing input channel before subpartitions 
initialized
---
 .../io/network/partition/ResultPartition.java      |  2 +-
 .../partition/UnionResultSubpartitionView.java     | 19 +++++++++--
 .../partition/consumer/SingleInputGate.java        |  3 --
 .../tier/remote/RemoteTierConsumerAgent.java       | 39 ++++++----------------
 .../partition/UnionResultSubpartitionViewTest.java | 24 +++++++++++--
 5 files changed, 49 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 3e98d6fb8eb..6361e07ce3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -320,7 +320,7 @@ public abstract class ResultPartition implements 
ResultPartitionWriter {
                     indexSet.values().iterator().next(), availabilityListener);
         } else {
             UnionResultSubpartitionView unionView =
-                    new UnionResultSubpartitionView(availabilityListener);
+                    new UnionResultSubpartitionView(availabilityListener, 
indexSet.size());
             try {
                 for (int i : indexSet.values()) {
                     ResultSubpartitionView view = createSubpartitionView(i, 
unionView);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
index eeb28fd0888..db65a215051 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
@@ -86,23 +86,31 @@ public class UnionResultSubpartitionView
     @GuardedBy("lock")
     private final Set<ResultSubpartitionView> unregisteredAvailableViews = new 
HashSet<>();
 
+    private final int numTotalViews;
+
     @GuardedBy("lock")
     private boolean isReleased;
 
     @GuardedBy("lock")
     private int sequenceNumber;
 
-    public UnionResultSubpartitionView(BufferAvailabilityListener 
availabilityListener) {
+    public UnionResultSubpartitionView(
+            BufferAvailabilityListener availabilityListener, int 
numTotalViews) {
         this.availabilityListener = availabilityListener;
         this.isReleased = false;
         this.sequenceNumber = 0;
+        this.numTotalViews = numTotalViews;
     }
 
     public void notifyViewCreated(int subpartitionId, ResultSubpartitionView 
view) {
         synchronized (lock) {
             allViews.put(subpartitionId, view);
-            if (unregisteredAvailableViews.remove(view)) {
-                notifyDataAvailable(view);
+            if (allViews.size() == numTotalViews) {
+                for (ResultSubpartitionView unregisteredAvailableView :
+                        unregisteredAvailableViews) {
+                    notifyDataAvailable(unregisteredAvailableView);
+                }
+                unregisteredAvailableViews.clear();
             }
         }
     }
@@ -180,6 +188,11 @@ public class UnionResultSubpartitionView
                 return;
             }
 
+            if (allViews.size() < numTotalViews) {
+                // Only notify availability after all views have been 
successfully created.
+                return;
+            }
+
             try {
                 cacheBuffer();
             } catch (IOException e) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 6993b035e03..354b1cf5241 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -377,9 +377,6 @@ public class SingleInputGate extends IndexedInputGate {
                         inputChannelsForCurrentPartition.put(
                                 realInputChannel.getChannelInfo(), 
realInputChannel);
                         channels[inputChannel.getChannelIndex()] = 
realInputChannel;
-                        if (enabledTieredStorage()) {
-                            queueChannel(realInputChannel, null, false);
-                        }
                     } catch (Throwable t) {
                         inputChannel.setError(t);
                         return;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
index 5ef0cf906df..b953dc07db2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
@@ -37,17 +37,17 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** The data client is used to fetch data from remote tier. */
 public class RemoteTierConsumerAgent implements TierConsumerAgent, 
AvailabilityNotifier {
 
+    private final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs;
+
     private final RemoteStorageScanner remoteStorageScanner;
 
     private final PartitionFileReader partitionFileReader;
@@ -70,25 +70,18 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent, AvailabilityN
 
     private AvailabilityNotifier notifier;
 
-    private final Map<TieredStoragePartitionId, 
Set<TieredStorageSubpartitionId>>
-            initSubpartitionIds = new HashMap<>();
-
     public RemoteTierConsumerAgent(
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
             RemoteStorageScanner remoteStorageScanner,
             PartitionFileReader partitionFileReader,
             int bufferSizeBytes) {
+        this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
         this.remoteStorageScanner = remoteStorageScanner;
         this.currentBufferIndexAndSegmentIds = new HashMap<>();
         this.partitionFileReader = partitionFileReader;
         this.bufferSizeBytes = bufferSizeBytes;
         
this.remoteStorageScanner.registerAvailabilityAndPriorityNotifier(this);
         for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) {
-            Set<TieredStorageSubpartitionId> subpartitionIds = new HashSet<>();
-            for (int subpartitionId : spec.getSubpartitionIds().values()) {
-                subpartitionIds.add(new 
TieredStorageSubpartitionId(subpartitionId));
-            }
-            initSubpartitionIds.put(spec.getPartitionId(), subpartitionIds);
             availableSubpartitionsQueues.putIfAbsent(
                     spec.getPartitionId(), new DeduplicatedQueue<>());
         }
@@ -97,21 +90,18 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent, AvailabilityN
     @Override
     public void start() {
         remoteStorageScanner.start();
+        for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) {
+            for (int subpartitionId : spec.getSubpartitionIds().values()) {
+                remoteStorageScanner.watchSegment(
+                        spec.getPartitionId(), new 
TieredStorageSubpartitionId(subpartitionId), 0);
+            }
+        }
     }
 
     @Override
     public int peekNextBufferSubpartitionId(
             TieredStoragePartitionId partitionId, ResultSubpartitionIndexSet 
indexSet)
             throws IOException {
-        if (initSubpartitionIds.containsKey(partitionId)) {
-            TieredStorageSubpartitionId subpartitionId =
-                    initSubpartitionIds.get(partitionId).iterator().next();
-            synchronized (availableSubpartitionsQueues) {
-                
availableSubpartitionsQueues.get(partitionId).add(subpartitionId);
-            }
-            return subpartitionId.getSubpartitionId();
-        }
-
         synchronized (availableSubpartitionsQueues) {
             for (TieredStorageSubpartitionId subpartitionId :
                     availableSubpartitionsQueues.get(partitionId).values()) {
@@ -128,20 +118,11 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent, AvailabilityN
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId) {
-        if (initSubpartitionIds.containsKey(partitionId)) {
-            Set<TieredStorageSubpartitionId> subpartitionIds = 
initSubpartitionIds.get(partitionId);
-            if (subpartitionIds.remove(subpartitionId)) {
-                if (subpartitionIds.isEmpty()) {
-                    initSubpartitionIds.remove(partitionId);
-                }
-            }
-        }
-
         // Get current segment id and buffer index.
         Tuple2<Integer, Integer> bufferIndexAndSegmentId =
                 currentBufferIndexAndSegmentIds
                         .computeIfAbsent(partitionId, ignore -> new 
HashMap<>())
-                        .getOrDefault(subpartitionId, Tuple2.of(0, -1));
+                        .getOrDefault(subpartitionId, Tuple2.of(0, 0));
         int currentBufferIndex = bufferIndexAndSegmentId.f0;
         int currentSegmentId = bufferIndexAndSegmentId.f1;
         if (segmentId != currentSegmentId) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
index ef9d430b663..8f9d2ceee26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
@@ -47,7 +47,7 @@ class UnionResultSubpartitionViewTest {
 
     @BeforeEach
     void before() {
-        view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> 
{});
+        view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> 
{}, 2);
 
         buffers0 =
                 Arrays.asList(
@@ -140,10 +140,13 @@ class UnionResultSubpartitionViewTest {
 
     @Test
     public void testDataAvailableBeforeRegistration() {
-        view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> 
{});
+        TestAvailabilityListener listener = new TestAvailabilityListener();
+        view = new UnionResultSubpartitionView(listener, 2);
         view0 = new TestingResultSubpartitionView(view, buffers0);
+        view1 = new TestingResultSubpartitionView(view, buffers1);
 
         view0.notifyDataAvailable();
+        assertThat(listener.isDataAvailable()).isFalse();
 
         ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 
=
                 view.getAvailabilityAndBacklog(true);
@@ -151,6 +154,10 @@ class UnionResultSubpartitionViewTest {
         assertThat(availabilityAndBacklog1.isAvailable()).isFalse();
 
         view.notifyViewCreated(0, view0);
+        assertThat(listener.isDataAvailable()).isFalse();
+
+        view.notifyViewCreated(1, view1);
+        assertThat(listener.isDataAvailable()).isTrue();
 
         ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 
=
                 view.getAvailabilityAndBacklog(true);
@@ -211,4 +218,17 @@ class UnionResultSubpartitionViewTest {
             return isReleased;
         }
     }
+
+    private static class TestAvailabilityListener implements 
BufferAvailabilityListener {
+        private boolean isDataAvailable = false;
+
+        @Override
+        public void notifyDataAvailable(ResultSubpartitionView view) {
+            isDataAvailable = true;
+        }
+
+        boolean isDataAvailable() {
+            return isDataAvailable;
+        }
+    }
 }

Reply via email to