xintongsong commented on code in PR #22855:
URL: https://github.com/apache/flink/pull/22855#discussion_r1260830009
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java:
##########
@@ -47,6 +48,13 @@ Optional<Buffer> getNextBuffer(
TieredStorageSubpartitionId subpartitionId,
int segmentId);
+ /**
+ * Register the notifier to notify the availability of a subpartition.
+ *
+ * @param notifier to notify availability and priority.
+ */
+ void registerAvailabilityAndPriorityNotifier(AvailabilityNotifier
notifier);
Review Comment:
Name needs update.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
+import org.apache.flink.util.ExceptionUtils;
+
+import
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentFinishDirPath;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link RemoteStorageScanner} is introduced to notify asynchronously for
file reading on
+ * remote storage. Asynchronous notifications will prevent {@link
RemoteTierConsumerAgent} from
+ * repeatedly attempting to read remote files and reduce CPU consumption.
+ *
+ * <p>It will be invoked by {@link RemoteTierConsumerAgent} to watch the
required segments and scan
+ * the existence status of the segments. If the segment file is found, it will
notify the
+ * availability of segment file.
+ */
+public class RemoteStorageScanner implements Runnable {
+
+ /** The initial scan interval is 100ms. */
+ private static final int INITIAL_SCAN_INTERVAL_MS = 100;
+
+ /** The max scan interval is 10000ms. */
+ private static final int MAX_SCAN_INTERVAL_MS = 10_000;
+
+ /** Executor to scan the existence status of segment files on remote
storage. */
+ private final ScheduledExecutorService scannerExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("remote storage file scanner")
+ .build());
+
+ /** The key is partition id and subpartition id, the value is required
segment id. */
+ private final Map<Tuple2<TieredStoragePartitionId,
TieredStorageSubpartitionId>, Integer>
+ requiredSegmentIds;
+
+ /**
+ * The key is partition id and subpartition id, the value is max id of
written segment files in
+ * the subpartition.
+ */
+ private final Map<Tuple2<TieredStoragePartitionId,
TieredStorageSubpartitionId>, Integer>
+ scannedMaxSegmentIds;
+
+ private final String baseRemoteStoragePath;
+
+ private final ScanStrategy scanStrategy;
+
+ private final FileSystem remoteFileSystem;
+
+ private AvailabilityNotifier notifier;
+
+ private int lastInterval = INITIAL_SCAN_INTERVAL_MS;
+
+ public RemoteStorageScanner(String baseRemoteStoragePath) {
+ this.baseRemoteStoragePath = baseRemoteStoragePath;
+ this.requiredSegmentIds = new ConcurrentHashMap<>();
+ this.scannedMaxSegmentIds = new ConcurrentHashMap<>();
+ this.scanStrategy = new ScanStrategy(MAX_SCAN_INTERVAL_MS);
+ this.remoteFileSystem = createFileSystem();
+ }
+
+ private FileSystem createFileSystem() {
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = new Path(baseRemoteStoragePath).getFileSystem();
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(
+ e, "Failed to initialize file system on the path: " +
baseRemoteStoragePath);
+ }
+ return fileSystem;
+ }
+
+ /** Start the executor. */
+ public void start() {
+ scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Watch the segment for a specific subpartition in the {@link
RemoteStorageScanner}.
+ *
+ * <p>If a segment with a larger or equal id already exists, the current
segment won't be
+ * watched.
+ *
+ * <p>If a segment with a smaller segment id is still being watched, the
current segment will
+ * replace it because the smaller segment should have been consumed. This
method ensures that
+ * only one segment file can be watched for each subpartition.
+ *
+ * @param partitionId is the id of partition.
+ * @param subpartitionId is the id of subpartition.
+ * @param segmentId is the id of segment.
+ */
+ public void watchSegment(
+ TieredStoragePartitionId partitionId,
+ TieredStorageSubpartitionId subpartitionId,
+ int segmentId) {
+ Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId> key =
+ Tuple2.of(partitionId, subpartitionId);
+ scannedMaxSegmentIds.compute(
+ key,
+ (segmentKey, maxSegmentId) -> {
+ if (maxSegmentId == null || maxSegmentId < segmentId) {
+ requiredSegmentIds.put(segmentKey, segmentId);
+ }
+ return maxSegmentId;
+ });
+ }
+
+ /** Close the executor. */
+ public void close() {
+ scannerExecutor.shutdownNow();
+ }
+
+ /** Iterate the watched segment ids and check related file status. */
+ @Override
+ public void run() {
+ Iterator<Map.Entry<Tuple2<TieredStoragePartitionId,
TieredStorageSubpartitionId>, Integer>>
+ iterator = requiredSegmentIds.entrySet().iterator();
+ boolean scanned = false;
+ while (iterator.hasNext()) {
+ Map.Entry<Tuple2<TieredStoragePartitionId,
TieredStorageSubpartitionId>, Integer> ids =
+ iterator.next();
+ TieredStoragePartitionId partitionId = ids.getKey().f0;
+ TieredStorageSubpartitionId subpartitionId = ids.getKey().f1;
+ int requiredSegmentId = ids.getValue();
+ int maxSegmentId = scannedMaxSegmentIds.getOrDefault(ids.getKey(),
-1);
+ if (maxSegmentId >= requiredSegmentId
+ && checkSegmentExist(partitionId, subpartitionId,
requiredSegmentId)) {
+ scanned = true;
+ iterator.remove();
+ notifier.notifyAvailable(partitionId, subpartitionId);
+ } else {
+ // The segment should be watched again because it's not found.
+ // If the segment belongs to other tiers and has been
consumed, the segment will be
+ // replaced by newly watched segment with larger segment id.
This logic is ensured
+ // by the method {@code watchSegment}.
+ scanMaxSegmentId(partitionId, subpartitionId);
+ }
+ }
+ lastInterval = scanned ? INITIAL_SCAN_INTERVAL_MS :
scanStrategy.getInterval(lastInterval);
+ start();
+ }
+
+ public void registerAvailabilityAndPriorityNotifier(AvailabilityNotifier
retriever) {
Review Comment:
Same here for the name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]