This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 973190e8ca5b7225f18b5c176726ef8680faffca Author: yunfengzhou-hub <[email protected]> AuthorDate: Thu Jan 25 09:39:26 2024 +0800 [FLINK-34233][runtime] Avoid enqueuing input channel before subpartitions initialized --- .../io/network/partition/ResultPartition.java | 2 +- .../partition/UnionResultSubpartitionView.java | 19 +++++++++-- .../partition/consumer/SingleInputGate.java | 3 -- .../tier/remote/RemoteTierConsumerAgent.java | 39 ++++++---------------- .../partition/UnionResultSubpartitionViewTest.java | 24 +++++++++++-- 5 files changed, 49 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 3e98d6fb8eb..6361e07ce3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -320,7 +320,7 @@ public abstract class ResultPartition implements ResultPartitionWriter { indexSet.values().iterator().next(), availabilityListener); } else { UnionResultSubpartitionView unionView = - new UnionResultSubpartitionView(availabilityListener); + new UnionResultSubpartitionView(availabilityListener, indexSet.size()); try { for (int i : indexSet.values()) { ResultSubpartitionView view = createSubpartitionView(i, unionView); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java index eeb28fd0888..db65a215051 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java @@ -86,23 +86,31 @@ public class UnionResultSubpartitionView @GuardedBy("lock") private final Set<ResultSubpartitionView> unregisteredAvailableViews = new HashSet<>(); + private final int numTotalViews; + @GuardedBy("lock") private boolean isReleased; @GuardedBy("lock") private int sequenceNumber; - public UnionResultSubpartitionView(BufferAvailabilityListener availabilityListener) { + public UnionResultSubpartitionView( + BufferAvailabilityListener availabilityListener, int numTotalViews) { this.availabilityListener = availabilityListener; this.isReleased = false; this.sequenceNumber = 0; + this.numTotalViews = numTotalViews; } public void notifyViewCreated(int subpartitionId, ResultSubpartitionView view) { synchronized (lock) { allViews.put(subpartitionId, view); - if (unregisteredAvailableViews.remove(view)) { - notifyDataAvailable(view); + if (allViews.size() == numTotalViews) { + for (ResultSubpartitionView unregisteredAvailableView : + unregisteredAvailableViews) { + notifyDataAvailable(unregisteredAvailableView); + } + unregisteredAvailableViews.clear(); } } } @@ -180,6 +188,11 @@ public class UnionResultSubpartitionView return; } + if (allViews.size() < numTotalViews) { + // Only notify availability after all views have been successfully created. + return; + } + try { cacheBuffer(); } catch (IOException e) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 6993b035e03..354b1cf5241 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -377,9 +377,6 @@ public class SingleInputGate extends IndexedInputGate { inputChannelsForCurrentPartition.put( realInputChannel.getChannelInfo(), realInputChannel); channels[inputChannel.getChannelIndex()] = realInputChannel; - if (enabledTieredStorage()) { - queueChannel(realInputChannel, null, false); - } } catch (Throwable t) { inputChannel.setError(t); return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java index 5ef0cf906df..b953dc07db2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java @@ -37,17 +37,17 @@ import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import static org.apache.flink.util.Preconditions.checkState; /** The data client is used to fetch data from remote tier. */ public class RemoteTierConsumerAgent implements TierConsumerAgent, AvailabilityNotifier { + private final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs; + private final RemoteStorageScanner remoteStorageScanner; private final PartitionFileReader partitionFileReader; @@ -70,25 +70,18 @@ public class RemoteTierConsumerAgent implements TierConsumerAgent, AvailabilityN private AvailabilityNotifier notifier; - private final Map<TieredStoragePartitionId, Set<TieredStorageSubpartitionId>> - initSubpartitionIds = new HashMap<>(); - public RemoteTierConsumerAgent( List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, RemoteStorageScanner remoteStorageScanner, PartitionFileReader partitionFileReader, int bufferSizeBytes) { + this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs; this.remoteStorageScanner = remoteStorageScanner; this.currentBufferIndexAndSegmentIds = new HashMap<>(); this.partitionFileReader = partitionFileReader; this.bufferSizeBytes = bufferSizeBytes; this.remoteStorageScanner.registerAvailabilityAndPriorityNotifier(this); for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) { - Set<TieredStorageSubpartitionId> subpartitionIds = new HashSet<>(); - for (int subpartitionId : spec.getSubpartitionIds().values()) { - subpartitionIds.add(new TieredStorageSubpartitionId(subpartitionId)); - } - initSubpartitionIds.put(spec.getPartitionId(), subpartitionIds); availableSubpartitionsQueues.putIfAbsent( spec.getPartitionId(), new DeduplicatedQueue<>()); } @@ -97,21 +90,18 @@ public class RemoteTierConsumerAgent implements TierConsumerAgent, AvailabilityN @Override public void start() { remoteStorageScanner.start(); + for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) { + for (int subpartitionId : spec.getSubpartitionIds().values()) { + remoteStorageScanner.watchSegment( + spec.getPartitionId(), new TieredStorageSubpartitionId(subpartitionId), 0); + } + } } @Override public int peekNextBufferSubpartitionId( TieredStoragePartitionId partitionId, ResultSubpartitionIndexSet indexSet) throws IOException { - if (initSubpartitionIds.containsKey(partitionId)) { - TieredStorageSubpartitionId subpartitionId = - initSubpartitionIds.get(partitionId).iterator().next(); - synchronized (availableSubpartitionsQueues) { - availableSubpartitionsQueues.get(partitionId).add(subpartitionId); - } - return subpartitionId.getSubpartitionId(); - } - synchronized (availableSubpartitionsQueues) { for (TieredStorageSubpartitionId subpartitionId : availableSubpartitionsQueues.get(partitionId).values()) { @@ -128,20 +118,11 @@ public class RemoteTierConsumerAgent implements TierConsumerAgent, AvailabilityN TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId) { - if (initSubpartitionIds.containsKey(partitionId)) { - Set<TieredStorageSubpartitionId> subpartitionIds = initSubpartitionIds.get(partitionId); - if (subpartitionIds.remove(subpartitionId)) { - if (subpartitionIds.isEmpty()) { - initSubpartitionIds.remove(partitionId); - } - } - } - // Get current segment id and buffer index. Tuple2<Integer, Integer> bufferIndexAndSegmentId = currentBufferIndexAndSegmentIds .computeIfAbsent(partitionId, ignore -> new HashMap<>()) - .getOrDefault(subpartitionId, Tuple2.of(0, -1)); + .getOrDefault(subpartitionId, Tuple2.of(0, 0)); int currentBufferIndex = bufferIndexAndSegmentId.f0; int currentSegmentId = bufferIndexAndSegmentId.f1; if (segmentId != currentSegmentId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java index ef9d430b663..8f9d2ceee26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java @@ -47,7 +47,7 @@ class UnionResultSubpartitionViewTest { @BeforeEach void before() { - view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> {}); + view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> {}, 2); buffers0 = Arrays.asList( @@ -140,10 +140,13 @@ class UnionResultSubpartitionViewTest { @Test public void testDataAvailableBeforeRegistration() { - view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> {}); + TestAvailabilityListener listener = new TestAvailabilityListener(); + view = new UnionResultSubpartitionView(listener, 2); view0 = new TestingResultSubpartitionView(view, buffers0); + view1 = new TestingResultSubpartitionView(view, buffers1); view0.notifyDataAvailable(); + assertThat(listener.isDataAvailable()).isFalse(); ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 = view.getAvailabilityAndBacklog(true); @@ -151,6 +154,10 @@ class UnionResultSubpartitionViewTest { assertThat(availabilityAndBacklog1.isAvailable()).isFalse(); view.notifyViewCreated(0, view0); + assertThat(listener.isDataAvailable()).isFalse(); + + view.notifyViewCreated(1, view1); + assertThat(listener.isDataAvailable()).isTrue(); ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 = view.getAvailabilityAndBacklog(true); @@ -211,4 +218,17 @@ class UnionResultSubpartitionViewTest { return isReleased; } } + + private static class TestAvailabilityListener implements BufferAvailabilityListener { + private boolean isDataAvailable = false; + + @Override + public void notifyDataAvailable(ResultSubpartitionView view) { + isDataAvailable = true; + } + + boolean isDataAvailable() { + return isDataAvailable; + } + } }
