This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 46aaea80830 [FLINK-35130][runtime] Simplify AvailabilityNotifierImpl 
to support speculative scheduler and improve performance
46aaea80830 is described below

commit 46aaea8083047fc86c35491336d795ddcd565128
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Apr 17 16:21:52 2024 +0800

    [FLINK-35130][runtime] Simplify AvailabilityNotifierImpl to support 
speculative scheduler and improve performance
---
 .../partition/consumer/SingleInputGate.java        | 41 +++++++---------------
 .../tiered/common/TieredStorageInputChannelId.java |  4 +++
 .../tiered/storage/AvailabilityNotifier.java       | 11 ------
 .../tiered/tier/remote/RemoteStorageScanner.java   | 13 ++++---
 .../tier/remote/RemoteTierConsumerAgent.java       | 34 ++++++++++++------
 .../tier/remote/RemoteStorageScannerTest.java      | 19 ++++------
 .../tier/remote/TestingAvailabilityNotifier.java   | 17 +++------
 7 files changed, 60 insertions(+), 79 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 5fdbc6aeb78..a4346d62eb1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -1301,39 +1301,22 @@ public class SingleInputGate extends IndexedInputGate {
     /** The default implementation of {@link AvailabilityNotifier}. */
     private class AvailabilityNotifierImpl implements AvailabilityNotifier {
 
-        private final Map<TieredStoragePartitionId, 
Map<TieredStorageSubpartitionId, Integer>>
-                subpartitionIdMap;
-
-        private final Map<TieredStoragePartitionId, 
Map<TieredStorageInputChannelId, Integer>>
-                channelIdMap;
-
-        private AvailabilityNotifierImpl() {
-            this.subpartitionIdMap = new HashMap<>();
-            this.channelIdMap = new HashMap<>();
-            for (int index = 0; index < 
checkNotNull(tieredStorageConsumerSpecs).size(); index++) {
-                TieredStorageConsumerSpec spec = 
tieredStorageConsumerSpecs.get(index);
-                for (int subpartitionId : spec.getSubpartitionIds().values()) {
-                    subpartitionIdMap
-                            .computeIfAbsent(spec.getPartitionId(), ignore -> 
new HashMap<>())
-                            .put(new 
TieredStorageSubpartitionId(subpartitionId), index);
-                }
-                channelIdMap
-                        .computeIfAbsent(spec.getPartitionId(), ignore -> new 
HashMap<>())
-                        .put(spec.getInputChannelId(), index);
-            }
-        }
-
-        @Override
-        public void notifyAvailable(
-                TieredStoragePartitionId partitionId, 
TieredStorageSubpartitionId subpartitionId) {
-            queueChannel(
-                    
channels[subpartitionIdMap.get(partitionId).get(subpartitionId)], null, false);
-        }
+        private AvailabilityNotifierImpl() {}
 
         @Override
         public void notifyAvailable(
                 TieredStoragePartitionId partitionId, 
TieredStorageInputChannelId inputChannelId) {
-            
queueChannel(channels[channelIdMap.get(partitionId).get(inputChannelId)], null, 
false);
+            Map<InputChannelInfo, InputChannel> channels =
+                    
inputChannels.get(partitionId.getPartitionID().getPartitionId());
+            if (channels == null) {
+                return;
+            }
+            InputChannelInfo inputChannelInfo =
+                    new InputChannelInfo(gateIndex, 
inputChannelId.getInputChannelId());
+            InputChannel inputChannel = channels.get(inputChannelInfo);
+            if (inputChannel != null) {
+                queueChannel(inputChannel, null, false);
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
index d5e77a4f112..0e454a74abd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
@@ -32,6 +32,10 @@ public class TieredStorageInputChannelId implements 
TieredStorageDataIdentifier,
         this.inputChannelId = inputChannelId;
     }
 
+    public int getInputChannelId() {
+        return inputChannelId;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hashCode(inputChannelId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
index 5816446c80a..d66946142c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
-import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 
 /**
  * {@link AvailabilityNotifier} is used to notify that the data from the 
specific partition and
@@ -28,16 +27,6 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
  */
 public interface AvailabilityNotifier {
 
-    /**
-     * Notify that the data from the specific partition and subpartition is 
available in tiered
-     * storage.
-     *
-     * @param partitionId the partition id.
-     * @param subpartitionId the subpartition id.
-     */
-    void notifyAvailable(
-            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId);
-
     /**
      * Notify that the data for the specific partition and input channel is 
available in tiered
      * storage.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
index 00358b34fe9..64ed0c04c85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
-import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -42,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentFinishDirPath;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
@@ -92,7 +92,8 @@ public class RemoteStorageScanner implements Runnable {
 
     private final FileSystem remoteFileSystem;
 
-    @Nullable private AvailabilityNotifier notifier;
+    @Nullable
+    private BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> 
availabilityNotifier;
 
     private int lastInterval = INITIAL_SCAN_INTERVAL_MS;
 
@@ -191,7 +192,7 @@ public class RemoteStorageScanner implements Runnable {
                         && checkSegmentExist(partitionId, subpartitionId, 
requiredSegmentId)) {
                     scanned = true;
                     iterator.remove();
-                    checkNotNull(notifier).notifyAvailable(partitionId, 
subpartitionId);
+                    checkNotNull(availabilityNotifier).accept(partitionId, 
subpartitionId);
                 } else {
                     // The segment should be watched again because it's not 
found.
                     // If the segment belongs to other tiers and has been 
consumed, the segment will
@@ -210,8 +211,10 @@ public class RemoteStorageScanner implements Runnable {
         }
     }
 
-    public void registerAvailabilityAndPriorityNotifier(AvailabilityNotifier 
retriever) {
-        this.notifier = retriever;
+    public void registerAvailabilityAndPriorityNotifier(
+            BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId>
+                    availabilityNotifier) {
+        this.availabilityNotifier = availabilityNotifier;
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
index b953dc07db2..07b19cf3b4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
@@ -44,7 +44,7 @@ import java.util.Optional;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** The data client is used to fetch data from remote tier. */
-public class RemoteTierConsumerAgent implements TierConsumerAgent, 
AvailabilityNotifier {
+public class RemoteTierConsumerAgent implements TierConsumerAgent {
 
     private final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs;
 
@@ -62,6 +62,11 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent, AvailabilityN
                     Map<TieredStorageSubpartitionId, Tuple2<Integer, Integer>>>
             currentBufferIndexAndSegmentIds;
 
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, 
TieredStorageInputChannelId>>
+            inputChannelIds;
+
     /** The indexes of all the subpartitions with data available stored in 
FIFO order. */
     private final Map<TieredStoragePartitionId, 
DeduplicatedQueue<TieredStorageSubpartitionId>>
             availableSubpartitionsQueues = new HashMap<>();
@@ -78,12 +83,20 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent, AvailabilityN
         this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
         this.remoteStorageScanner = remoteStorageScanner;
         this.currentBufferIndexAndSegmentIds = new HashMap<>();
+        this.inputChannelIds = new HashMap<>();
         this.partitionFileReader = partitionFileReader;
         this.bufferSizeBytes = bufferSizeBytes;
-        
this.remoteStorageScanner.registerAvailabilityAndPriorityNotifier(this);
+        
this.remoteStorageScanner.registerAvailabilityAndPriorityNotifier(this::notifyAvailable);
         for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) {
             availableSubpartitionsQueues.putIfAbsent(
                     spec.getPartitionId(), new DeduplicatedQueue<>());
+            for (int subpartitionId : spec.getSubpartitionIds().values()) {
+                inputChannelIds
+                        .computeIfAbsent(spec.getPartitionId(), partition -> 
new HashMap<>())
+                        .put(
+                                new 
TieredStorageSubpartitionId(subpartitionId),
+                                spec.getInputChannelId());
+            }
         }
     }
 
@@ -175,7 +188,6 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent, AvailabilityN
         remoteStorageScanner.close();
     }
 
-    @Override
     public void notifyAvailable(
             TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId) {
         synchronized (availableSubpartitionsQueues) {
@@ -183,12 +195,14 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent, AvailabilityN
                 return;
             }
         }
-        this.notifier.notifyAvailable(partitionId, subpartitionId);
-    }
-
-    @Override
-    public void notifyAvailable(
-            TieredStoragePartitionId partitionId, TieredStorageInputChannelId 
inputChannelId) {
-        throw new UnsupportedOperationException("This method should not be 
invoked.");
+        Map<TieredStorageSubpartitionId, TieredStorageInputChannelId> 
subpartitionChannels =
+                inputChannelIds.get(partitionId);
+        if (subpartitionChannels == null) {
+            return;
+        }
+        TieredStorageInputChannelId inputChannelId = 
subpartitionChannels.get(subpartitionId);
+        if (inputChannelId != null) {
+            notifier.notifyAvailable(partitionId, inputChannelId);
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
index 9b207b04da2..edc354df948 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
@@ -60,10 +61,8 @@ class RemoteStorageScannerTest {
     @Test
     void testWatchSegment() throws IOException {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        TestingAvailabilityNotifier notifier =
-                new TestingAvailabilityNotifier.Builder()
-                        .setNotifyFunction(((partitionId, subpartitionId) -> 
future))
-                        .build();
+        BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> 
notifier =
+                (partitionId, subpartitionId) -> future.complete(null);
 
         // Create segment files with id 2, and finish file with id 2.
         createSegmentFile(2);
@@ -81,10 +80,8 @@ class RemoteStorageScannerTest {
     @Test
     void testWatchSegmentIgnored() throws IOException {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        TestingAvailabilityNotifier notifier =
-                new TestingAvailabilityNotifier.Builder()
-                        .setNotifyFunction(((partitionId, subpartitionId) -> 
future))
-                        .build();
+        BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> 
notifier =
+                (partitionId, subpartitionId) -> future.complete(null);
 
         // Create segment files with id 2 and id 3, and finish file with id 3.
         createSegmentFile(2);
@@ -110,10 +107,8 @@ class RemoteStorageScannerTest {
     @Test
     void testStartAndClose() throws IOException, ExecutionException, 
InterruptedException {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        TestingAvailabilityNotifier notifier =
-                new TestingAvailabilityNotifier.Builder()
-                        .setNotifyFunction(((partitionId, subpartitionId) -> 
future))
-                        .build();
+        BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> 
notifier =
+                (partitionId, subpartitionId) -> future.complete(null);
         createSegmentFile(0);
         createSegmentFinishFile(0);
         RemoteStorageScanner remoteStorageScanner = new 
RemoteStorageScanner(remoteStoragePath);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
index 7599e36698f..1839824a8c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
 
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
-import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
 
 import java.util.concurrent.CompletableFuture;
@@ -30,28 +29,22 @@ import java.util.function.BiFunction;
 public class TestingAvailabilityNotifier implements AvailabilityNotifier {
 
     private final BiFunction<
-                    TieredStoragePartitionId, TieredStorageSubpartitionId, 
CompletableFuture<Void>>
+                    TieredStoragePartitionId, TieredStorageInputChannelId, 
CompletableFuture<Void>>
             notifyFunction;
 
     public TestingAvailabilityNotifier(
             BiFunction<
                             TieredStoragePartitionId,
-                            TieredStorageSubpartitionId,
+                            TieredStorageInputChannelId,
                             CompletableFuture<Void>>
                     notifyFunction) {
         this.notifyFunction = notifyFunction;
     }
 
-    @Override
-    public void notifyAvailable(
-            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId) {
-        notifyFunction.apply(partitionId, subpartitionId).complete(null);
-    }
-
     @Override
     public void notifyAvailable(
             TieredStoragePartitionId partitionId, TieredStorageInputChannelId 
inputChannelId) {
-        throw new UnsupportedOperationException();
+        notifyFunction.apply(partitionId, inputChannelId);
     }
 
     /** Builder for {@link TestingAvailabilityNotifier}. */
@@ -59,7 +52,7 @@ public class TestingAvailabilityNotifier implements 
AvailabilityNotifier {
 
         private BiFunction<
                         TieredStoragePartitionId,
-                        TieredStorageSubpartitionId,
+                        TieredStorageInputChannelId,
                         CompletableFuture<Void>>
                 notifyFunction = (partitionId, subpartitionId) -> new 
CompletableFuture<>();
 
@@ -68,7 +61,7 @@ public class TestingAvailabilityNotifier implements 
AvailabilityNotifier {
         public Builder setNotifyFunction(
                 BiFunction<
                                 TieredStoragePartitionId,
-                                TieredStorageSubpartitionId,
+                                TieredStorageInputChannelId,
                                 CompletableFuture<Void>>
                         notifyFunction) {
             this.notifyFunction = notifyFunction;

Reply via email to