This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 46aaea80830 [FLINK-35130][runtime] Simplify AvailabilityNotifierImpl
to support speculative scheduler and improve performance
46aaea80830 is described below
commit 46aaea8083047fc86c35491336d795ddcd565128
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Apr 17 16:21:52 2024 +0800
[FLINK-35130][runtime] Simplify AvailabilityNotifierImpl to support
speculative scheduler and improve performance
---
.../partition/consumer/SingleInputGate.java | 41 +++++++---------------
.../tiered/common/TieredStorageInputChannelId.java | 4 +++
.../tiered/storage/AvailabilityNotifier.java | 11 ------
.../tiered/tier/remote/RemoteStorageScanner.java | 13 ++++---
.../tier/remote/RemoteTierConsumerAgent.java | 34 ++++++++++++------
.../tier/remote/RemoteStorageScannerTest.java | 19 ++++------
.../tier/remote/TestingAvailabilityNotifier.java | 17 +++------
7 files changed, 60 insertions(+), 79 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 5fdbc6aeb78..a4346d62eb1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -1301,39 +1301,22 @@ public class SingleInputGate extends IndexedInputGate {
/** The default implementation of {@link AvailabilityNotifier}. */
private class AvailabilityNotifierImpl implements AvailabilityNotifier {
- private final Map<TieredStoragePartitionId,
Map<TieredStorageSubpartitionId, Integer>>
- subpartitionIdMap;
-
- private final Map<TieredStoragePartitionId,
Map<TieredStorageInputChannelId, Integer>>
- channelIdMap;
-
- private AvailabilityNotifierImpl() {
- this.subpartitionIdMap = new HashMap<>();
- this.channelIdMap = new HashMap<>();
- for (int index = 0; index <
checkNotNull(tieredStorageConsumerSpecs).size(); index++) {
- TieredStorageConsumerSpec spec =
tieredStorageConsumerSpecs.get(index);
- for (int subpartitionId : spec.getSubpartitionIds().values()) {
- subpartitionIdMap
- .computeIfAbsent(spec.getPartitionId(), ignore ->
new HashMap<>())
- .put(new
TieredStorageSubpartitionId(subpartitionId), index);
- }
- channelIdMap
- .computeIfAbsent(spec.getPartitionId(), ignore -> new
HashMap<>())
- .put(spec.getInputChannelId(), index);
- }
- }
-
- @Override
- public void notifyAvailable(
- TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId) {
- queueChannel(
-
channels[subpartitionIdMap.get(partitionId).get(subpartitionId)], null, false);
- }
+ private AvailabilityNotifierImpl() {}
@Override
public void notifyAvailable(
TieredStoragePartitionId partitionId,
TieredStorageInputChannelId inputChannelId) {
-
queueChannel(channels[channelIdMap.get(partitionId).get(inputChannelId)], null,
false);
+ Map<InputChannelInfo, InputChannel> channels =
+
inputChannels.get(partitionId.getPartitionID().getPartitionId());
+ if (channels == null) {
+ return;
+ }
+ InputChannelInfo inputChannelInfo =
+ new InputChannelInfo(gateIndex,
inputChannelId.getInputChannelId());
+ InputChannel inputChannel = channels.get(inputChannelInfo);
+ if (inputChannel != null) {
+ queueChannel(inputChannel, null, false);
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
index d5e77a4f112..0e454a74abd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageInputChannelId.java
@@ -32,6 +32,10 @@ public class TieredStorageInputChannelId implements
TieredStorageDataIdentifier,
this.inputChannelId = inputChannelId;
}
+ public int getInputChannelId() {
+ return inputChannelId;
+ }
+
@Override
public int hashCode() {
return Objects.hashCode(inputChannelId);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
index 5816446c80a..d66946142c8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityNotifier.java
@@ -20,7 +20,6 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
-import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
/**
* {@link AvailabilityNotifier} is used to notify that the data from the
specific partition and
@@ -28,16 +27,6 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
*/
public interface AvailabilityNotifier {
- /**
- * Notify that the data from the specific partition and subpartition is
available in tiered
- * storage.
- *
- * @param partitionId the partition id.
- * @param subpartitionId the subpartition id.
- */
- void notifyAvailable(
- TieredStoragePartitionId partitionId, TieredStorageSubpartitionId
subpartitionId);
-
/**
* Notify that the data for the specific partition and input channel is
available in tiered
* storage.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
index 00358b34fe9..64ed0c04c85 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
-import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -42,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentFinishDirPath;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
@@ -92,7 +92,8 @@ public class RemoteStorageScanner implements Runnable {
private final FileSystem remoteFileSystem;
- @Nullable private AvailabilityNotifier notifier;
+ @Nullable
+ private BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId>
availabilityNotifier;
private int lastInterval = INITIAL_SCAN_INTERVAL_MS;
@@ -191,7 +192,7 @@ public class RemoteStorageScanner implements Runnable {
&& checkSegmentExist(partitionId, subpartitionId,
requiredSegmentId)) {
scanned = true;
iterator.remove();
- checkNotNull(notifier).notifyAvailable(partitionId,
subpartitionId);
+ checkNotNull(availabilityNotifier).accept(partitionId,
subpartitionId);
} else {
// The segment should be watched again because it's not
found.
// If the segment belongs to other tiers and has been
consumed, the segment will
@@ -210,8 +211,10 @@ public class RemoteStorageScanner implements Runnable {
}
}
- public void registerAvailabilityAndPriorityNotifier(AvailabilityNotifier
retriever) {
- this.notifier = retriever;
+ public void registerAvailabilityAndPriorityNotifier(
+ BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId>
+ availabilityNotifier) {
+ this.availabilityNotifier = availabilityNotifier;
}
// ------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
index b953dc07db2..07b19cf3b4c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
@@ -44,7 +44,7 @@ import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkState;
/** The data client is used to fetch data from remote tier. */
-public class RemoteTierConsumerAgent implements TierConsumerAgent,
AvailabilityNotifier {
+public class RemoteTierConsumerAgent implements TierConsumerAgent {
private final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs;
@@ -62,6 +62,11 @@ public class RemoteTierConsumerAgent implements
TierConsumerAgent, AvailabilityN
Map<TieredStorageSubpartitionId, Tuple2<Integer, Integer>>>
currentBufferIndexAndSegmentIds;
+ private final Map<
+ TieredStoragePartitionId,
+ Map<TieredStorageSubpartitionId,
TieredStorageInputChannelId>>
+ inputChannelIds;
+
/** The indexes of all the subpartitions with data available stored in
FIFO order. */
private final Map<TieredStoragePartitionId,
DeduplicatedQueue<TieredStorageSubpartitionId>>
availableSubpartitionsQueues = new HashMap<>();
@@ -78,12 +83,20 @@ public class RemoteTierConsumerAgent implements
TierConsumerAgent, AvailabilityN
this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
this.remoteStorageScanner = remoteStorageScanner;
this.currentBufferIndexAndSegmentIds = new HashMap<>();
+ this.inputChannelIds = new HashMap<>();
this.partitionFileReader = partitionFileReader;
this.bufferSizeBytes = bufferSizeBytes;
-
this.remoteStorageScanner.registerAvailabilityAndPriorityNotifier(this);
+
this.remoteStorageScanner.registerAvailabilityAndPriorityNotifier(this::notifyAvailable);
for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) {
availableSubpartitionsQueues.putIfAbsent(
spec.getPartitionId(), new DeduplicatedQueue<>());
+ for (int subpartitionId : spec.getSubpartitionIds().values()) {
+ inputChannelIds
+ .computeIfAbsent(spec.getPartitionId(), partition ->
new HashMap<>())
+ .put(
+ new
TieredStorageSubpartitionId(subpartitionId),
+ spec.getInputChannelId());
+ }
}
}
@@ -175,7 +188,6 @@ public class RemoteTierConsumerAgent implements
TierConsumerAgent, AvailabilityN
remoteStorageScanner.close();
}
- @Override
public void notifyAvailable(
TieredStoragePartitionId partitionId, TieredStorageSubpartitionId
subpartitionId) {
synchronized (availableSubpartitionsQueues) {
@@ -183,12 +195,14 @@ public class RemoteTierConsumerAgent implements
TierConsumerAgent, AvailabilityN
return;
}
}
- this.notifier.notifyAvailable(partitionId, subpartitionId);
- }
-
- @Override
- public void notifyAvailable(
- TieredStoragePartitionId partitionId, TieredStorageInputChannelId
inputChannelId) {
- throw new UnsupportedOperationException("This method should not be
invoked.");
+ Map<TieredStorageSubpartitionId, TieredStorageInputChannelId>
subpartitionChannels =
+ inputChannelIds.get(partitionId);
+ if (subpartitionChannels == null) {
+ return;
+ }
+ TieredStorageInputChannelId inputChannelId =
subpartitionChannels.get(subpartitionId);
+ if (inputChannelId != null) {
+ notifier.notifyAvailable(partitionId, inputChannelId);
+ }
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
index 9b207b04da2..edc354df948 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerTest.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
@@ -60,10 +61,8 @@ class RemoteStorageScannerTest {
@Test
void testWatchSegment() throws IOException {
CompletableFuture<Void> future = new CompletableFuture<>();
- TestingAvailabilityNotifier notifier =
- new TestingAvailabilityNotifier.Builder()
- .setNotifyFunction(((partitionId, subpartitionId) ->
future))
- .build();
+ BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId>
notifier =
+ (partitionId, subpartitionId) -> future.complete(null);
// Create segment files with id 2, and finish file with id 2.
createSegmentFile(2);
@@ -81,10 +80,8 @@ class RemoteStorageScannerTest {
@Test
void testWatchSegmentIgnored() throws IOException {
CompletableFuture<Void> future = new CompletableFuture<>();
- TestingAvailabilityNotifier notifier =
- new TestingAvailabilityNotifier.Builder()
- .setNotifyFunction(((partitionId, subpartitionId) ->
future))
- .build();
+ BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId>
notifier =
+ (partitionId, subpartitionId) -> future.complete(null);
// Create segment files with id 2 and id 3, and finish file with id 3.
createSegmentFile(2);
@@ -110,10 +107,8 @@ class RemoteStorageScannerTest {
@Test
void testStartAndClose() throws IOException, ExecutionException,
InterruptedException {
CompletableFuture<Void> future = new CompletableFuture<>();
- TestingAvailabilityNotifier notifier =
- new TestingAvailabilityNotifier.Builder()
- .setNotifyFunction(((partitionId, subpartitionId) ->
future))
- .build();
+ BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId>
notifier =
+ (partitionId, subpartitionId) -> future.complete(null);
createSegmentFile(0);
createSegmentFinishFile(0);
RemoteStorageScanner remoteStorageScanner = new
RemoteStorageScanner(remoteStoragePath);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
index 7599e36698f..1839824a8c3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/TestingAvailabilityNotifier.java
@@ -20,7 +20,6 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
-import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import java.util.concurrent.CompletableFuture;
@@ -30,28 +29,22 @@ import java.util.function.BiFunction;
public class TestingAvailabilityNotifier implements AvailabilityNotifier {
private final BiFunction<
- TieredStoragePartitionId, TieredStorageSubpartitionId,
CompletableFuture<Void>>
+ TieredStoragePartitionId, TieredStorageInputChannelId,
CompletableFuture<Void>>
notifyFunction;
public TestingAvailabilityNotifier(
BiFunction<
TieredStoragePartitionId,
- TieredStorageSubpartitionId,
+ TieredStorageInputChannelId,
CompletableFuture<Void>>
notifyFunction) {
this.notifyFunction = notifyFunction;
}
- @Override
- public void notifyAvailable(
- TieredStoragePartitionId partitionId, TieredStorageSubpartitionId
subpartitionId) {
- notifyFunction.apply(partitionId, subpartitionId).complete(null);
- }
-
@Override
public void notifyAvailable(
TieredStoragePartitionId partitionId, TieredStorageInputChannelId
inputChannelId) {
- throw new UnsupportedOperationException();
+ notifyFunction.apply(partitionId, inputChannelId);
}
/** Builder for {@link TestingAvailabilityNotifier}. */
@@ -59,7 +52,7 @@ public class TestingAvailabilityNotifier implements
AvailabilityNotifier {
private BiFunction<
TieredStoragePartitionId,
- TieredStorageSubpartitionId,
+ TieredStorageInputChannelId,
CompletableFuture<Void>>
notifyFunction = (partitionId, subpartitionId) -> new
CompletableFuture<>();
@@ -68,7 +61,7 @@ public class TestingAvailabilityNotifier implements
AvailabilityNotifier {
public Builder setNotifyFunction(
BiFunction<
TieredStoragePartitionId,
- TieredStorageSubpartitionId,
+ TieredStorageInputChannelId,
CompletableFuture<Void>>
notifyFunction) {
this.notifyFunction = notifyFunction;