xintongsong commented on code in PR #22855:
URL: https://github.com/apache/flink/pull/22855#discussion_r1257405998


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java:
##########
@@ -42,6 +44,8 @@ public class DiskTierConsumerAgent implements 
TierConsumerAgent {
                     Map<TieredStorageSubpartitionId, 
CompletableFuture<NettyConnectionReader>>>
             nettyConnectionReaders = new HashMap<>();
 
+    private AvailabilityAndPriorityNotifier notifier;

Review Comment:
   This is `null` initially. We should mark it `@Nullable`, and add checks 
before using it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityAndPriorityNotifier;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentFinishDirPath;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link RemoteStorageScanner} is introduced to notify asynchronously for 
file reading on
+ * remote storage. Asynchronous notifications will prevent {@link 
RemoteTierConsumerAgent} from
+ * repeatedly attempting to read remote files and reduce CPU consumption.
+ *
+ * <p>It will be invoked by {@link RemoteTierConsumerAgent} to watch the 
required segments and scan
+ * the existence status of the segments. If the segment file is found, it will 
notify the
+ * availability of segment file.
+ */
+public class RemoteStorageScanner implements Runnable {
+
+    /** The initial scan interval is 100ms. */
+    private static final int INITIAL_SCAN_INTERVAL_MS = 100;
+
+    /** The max scan interval is 10000ms. */
+    private static final int MAX_SCAN_INTERVAL_MS = 10_000;
+
+    /** Executor to scan the existence status of segment files on remote 
storage. */
+    private final ScheduledExecutorService scannerExecutor =
+            Executors.newSingleThreadScheduledExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("remote storage file scanner")
+                            .build());
+
+    /** The key is partition id and subpartition id, the value is required 
segment id. */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            requiredSegmentIds;
+
+    /**
+     * The key is partition id and subpartition id, the value is max id of 
written segment files in
+     * the subpartition.
+     */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            scannedMaxSegmentIds;
+
+    private final String baseRemoteStoragePath;
+
+    private final ScanStrategy scanStrategy;
+
+    private FileSystem remoteFileSystem;
+
+    private AvailabilityAndPriorityNotifier notifier;
+
+    private int attemptNumber = 0;
+
+    public RemoteStorageScanner(String baseRemoteStoragePath) {
+        this.baseRemoteStoragePath = baseRemoteStoragePath;
+        this.requiredSegmentIds = new ConcurrentHashMap<>();
+        this.scannedMaxSegmentIds = new ConcurrentHashMap<>();
+        this.scanStrategy = new ScanStrategy(INITIAL_SCAN_INTERVAL_MS, 
MAX_SCAN_INTERVAL_MS);
+        try {
+            this.remoteFileSystem = new 
Path(baseRemoteStoragePath).getFileSystem();
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(
+                    e, "Failed to initialize file system on the path: " + 
baseRemoteStoragePath);
+        }
+    }
+
+    /** Start the executor. */
+    public void start() {
+        scannerExecutor.schedule(
+                this, scanStrategy.getInterval(attemptNumber), 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Watch the segment for a specific subpartition in the {@link 
RemoteStorageScanner}.
+     *
+     * <p>If a segment with a larger or equal id already exists, the current 
segment won't be
+     * watched.
+     *
+     * <p>If a segment with a smaller segment id is still being watched, the 
current segment will
+     * replace it because the smaller segment should have been consumed. This 
method ensures that
+     * only one segment file can be watched for each subpartition.
+     *
+     * @param partitionId is the id of partition.
+     * @param subpartitionId is the id of subpartition.
+     * @param segmentId is the id of segment.
+     */
+    public void watchSegment(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            int segmentId) {
+        Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId> key =
+                Tuple2.of(partitionId, subpartitionId);
+        if (scannedMaxSegmentIds.getOrDefault(key, -1) >= segmentId) {
+            return;
+        }
+        requiredSegmentIds.put(key, segmentId);
+    }
+
+    /** Close the executor. */
+    public void close() {
+        scannerExecutor.shutdownNow();
+    }
+
+    /** Iterate the watched segment ids and check related file status. */
+    @Override
+    public void run() {
+        Iterator<Map.Entry<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>>
+                iterator = requiredSegmentIds.entrySet().iterator();
+        boolean scanned = false;
+        while (iterator.hasNext()) {
+            Map.Entry<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer> ids =
+                    iterator.next();
+            TieredStoragePartitionId partitionId = ids.getKey().f0;
+            TieredStorageSubpartitionId subpartitionId = ids.getKey().f1;
+            int requiredSegmentId = ids.getValue();
+            int maxSegmentId = scannedMaxSegmentIds.getOrDefault(ids.getKey(), 
-1);
+            if (maxSegmentId >= requiredSegmentId) {
+                scanned = true;
+                iterator.remove();
+                notifier.notifyAvailableAndPriority(partitionId, 
subpartitionId, false);
+            } else {
+                // The segment should be watched again because it's not found.
+                // If the segment belongs to other tiers and has been 
consumed, the segment will be
+                // replaced by newly watched segment with larger segment id. 
This logic is ensured
+                // by the method {@code watchSegment}.
+                scanMaxSegmentId(partitionId, subpartitionId);
+            }
+        }
+        attemptNumber = scanned ? 0 : attemptNumber + 1;
+        scannerExecutor.schedule(
+                this, scanStrategy.getInterval(attemptNumber), 
TimeUnit.MILLISECONDS);
+    }
+
+    public void 
registerAvailabilityAndPriorityNotifier(AvailabilityAndPriorityNotifier 
retriever) {
+        this.notifier = retriever;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Scan the max segment id of segment files for the specific partition and 
subpartition. The max
+     * segment id can be obtained from a file named by max segment id.
+     *
+     * @param partitionId the partition id.
+     * @param subpartitionId the subpartition id.
+     */
+    private void scanMaxSegmentId(
+            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId) {
+        Path segmentFinishDir =
+                getSegmentFinishDirPath(
+                        baseRemoteStoragePath, partitionId, 
subpartitionId.getSubpartitionId());
+        FileStatus[] fileStatuses;
+        try {
+            if (!remoteFileSystem.exists(segmentFinishDir)) {
+                return;
+            }
+            fileStatuses = remoteFileSystem.listStatus(segmentFinishDir);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to list the segment finish file. " + 
segmentFinishDir, e);
+        }
+        if (fileStatuses.length == 0) {
+            return;
+        }
+        checkState(fileStatuses.length == 1);
+        scannedMaxSegmentIds.put(
+                Tuple2.of(partitionId, subpartitionId),
+                Integer.parseInt(fileStatuses[0].getPath().getName()));
+    }
+
+    /**
+     * The strategy is used to decide the scan interval of {@link 
RemoteStorageScanner}. The
+     * interval will be updated exponentially and restricted by max value.
+     */
+    private static class ScanStrategy {
+
+        private final int initialScanInterval;
+
+        private final int maxScanInterval;
+
+        private ScanStrategy(int initialScanInterval, int maxScanInterval) {
+            checkArgument(
+                    initialScanInterval > 0,
+                    "initialScanInterval must be positive, was %s",
+                    initialScanInterval);
+            checkArgument(
+                    maxScanInterval > 0,
+                    "maxScanInterval must be positive, was %s",
+                    maxScanInterval);
+            checkArgument(
+                    initialScanInterval <= maxScanInterval,
+                    "initialScanInterval must be lower than or equal to 
maxScanInterval",
+                    maxScanInterval);
+            this.initialScanInterval = initialScanInterval;
+            this.maxScanInterval = maxScanInterval;
+        }
+
+        private long getInterval(int attempt) {
+            checkArgument(attempt >= 0, "attempt must not be negative (%s)", 
attempt);
+            final long interval = initialScanInterval * Math.round(Math.pow(2, 
attempt));
+            return interval >= 0 && interval < maxScanInterval ? interval : 
maxScanInterval;
+        }

Review Comment:
   Relying on `interval >= 0` to check for overflows can be problematic, 
because the `attempt` is keep growing and it's possible that an overflowed 
interval may still fall into the range `[0, maxScanInterval)`. Alternatively, 
you may replace `attempt` with a `lastInterval`, which stops growing after 
reaching the max.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityAndPriorityNotifier;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentFinishDirPath;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link RemoteStorageScanner} is introduced to notify asynchronously for 
file reading on
+ * remote storage. Asynchronous notifications will prevent {@link 
RemoteTierConsumerAgent} from
+ * repeatedly attempting to read remote files and reduce CPU consumption.
+ *
+ * <p>It will be invoked by {@link RemoteTierConsumerAgent} to watch the 
required segments and scan
+ * the existence status of the segments. If the segment file is found, it will 
notify the
+ * availability of segment file.
+ */
+public class RemoteStorageScanner implements Runnable {
+
+    /** The initial scan interval is 100ms. */
+    private static final int INITIAL_SCAN_INTERVAL_MS = 100;
+
+    /** The max scan interval is 10000ms. */
+    private static final int MAX_SCAN_INTERVAL_MS = 10_000;
+
+    /** Executor to scan the existence status of segment files on remote 
storage. */
+    private final ScheduledExecutorService scannerExecutor =
+            Executors.newSingleThreadScheduledExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("remote storage file scanner")
+                            .build());
+
+    /** The key is partition id and subpartition id, the value is required 
segment id. */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            requiredSegmentIds;
+
+    /**
+     * The key is partition id and subpartition id, the value is max id of 
written segment files in
+     * the subpartition.
+     */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            scannedMaxSegmentIds;
+
+    private final String baseRemoteStoragePath;
+
+    private final ScanStrategy scanStrategy;
+
+    private FileSystem remoteFileSystem;
+
+    private AvailabilityAndPriorityNotifier notifier;
+
+    private int attemptNumber = 0;
+
+    public RemoteStorageScanner(String baseRemoteStoragePath) {
+        this.baseRemoteStoragePath = baseRemoteStoragePath;
+        this.requiredSegmentIds = new ConcurrentHashMap<>();
+        this.scannedMaxSegmentIds = new ConcurrentHashMap<>();
+        this.scanStrategy = new ScanStrategy(INITIAL_SCAN_INTERVAL_MS, 
MAX_SCAN_INTERVAL_MS);
+        try {
+            this.remoteFileSystem = new 
Path(baseRemoteStoragePath).getFileSystem();
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(
+                    e, "Failed to initialize file system on the path: " + 
baseRemoteStoragePath);
+        }
+    }
+
+    /** Start the executor. */
+    public void start() {
+        scannerExecutor.schedule(
+                this, scanStrategy.getInterval(attemptNumber), 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Watch the segment for a specific subpartition in the {@link 
RemoteStorageScanner}.
+     *
+     * <p>If a segment with a larger or equal id already exists, the current 
segment won't be
+     * watched.
+     *
+     * <p>If a segment with a smaller segment id is still being watched, the 
current segment will
+     * replace it because the smaller segment should have been consumed. This 
method ensures that
+     * only one segment file can be watched for each subpartition.
+     *
+     * @param partitionId is the id of partition.
+     * @param subpartitionId is the id of subpartition.
+     * @param segmentId is the id of segment.
+     */
+    public void watchSegment(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            int segmentId) {
+        Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId> key =
+                Tuple2.of(partitionId, subpartitionId);
+        if (scannedMaxSegmentIds.getOrDefault(key, -1) >= segmentId) {
+            return;
+        }
+        requiredSegmentIds.put(key, segmentId);

Review Comment:
   Minor: May use `Map#compute` to reduce map access.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java:
##########
@@ -42,6 +44,8 @@ public class DiskTierConsumerAgent implements 
TierConsumerAgent {
                     Map<TieredStorageSubpartitionId, 
CompletableFuture<NettyConnectionReader>>>
             nettyConnectionReaders = new HashMap<>();
 
+    private AvailabilityAndPriorityNotifier notifier;

Review Comment:
   And same for `MemoryTierConsumerAgent`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityAndPriorityNotifier;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** The data client is used to fetch data from remote tier. */
+public class RemoteTierConsumerAgent implements TierConsumerAgent {
+
+    private final RemoteStorageScanner remoteStorageScanner;
+
+    private final PartitionFileReader partitionFileReader;
+
+    /**
+     * The current reading buffer indexes and segment ids stored in map.
+     *
+     * <p>The key is partition id and subpartition id. The value is buffer 
index and segment id.
+     */
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, Tuple2<Integer, Integer>>>
+            currentBufferIndexAndSegmentIds;
+
+    private final int bufferSizeBytes;
+
+    private AvailabilityAndPriorityNotifier notifier;

Review Comment:
   Same here for the potential null-value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -1146,21 +1151,34 @@ private void setupTieredStorageNettyService(
             int channelIndex = index;
             channelSuppliers.add(() -> channels[channelIndex]);
         }
-        nettyService.setupInputChannels(
-                tieredStorageConsumerSpecs,
-                channelSuppliers,
-                new NettyConnectionReaderAvailabilityAndPriorityHelper() {
-                    @Override
-                    public void notifyReaderAvailableAndPriority(
-                            int channelIndex, boolean isPriority) {
-                        queueChannel(channels[channelIndex], null, isPriority);
-                    }
+        nettyService.setupInputChannels(tieredStorageConsumerSpecs, 
channelSuppliers);
+    }
 
-                    @Override
-                    public void updatePrioritySequenceNumber(int channelIndex, 
int sequenceNumber) {
-                        lastPrioritySequenceNumber[channelIndex] = 
sequenceNumber;
-                    }
-                });
+    /** The default implementation of {@link AvailabilityAndPriorityNotifier}. 
*/
+    private class AvailabilityAndPriorityNotifierImpl implements 
AvailabilityAndPriorityNotifier {
+
+        private final Map<TieredStoragePartitionId, 
Map<TieredStorageSubpartitionId, Integer>>
+                channelIndexes;
+
+        public AvailabilityAndPriorityNotifierImpl() {
+            this.channelIndexes = new HashMap<>();
+            for (int index = 0; index < 
checkNotNull(tieredStorageConsumerSpecs).size(); index++) {
+                TieredStorageConsumerSpec spec = 
tieredStorageConsumerSpecs.get(index);
+                channelIndexes
+                        .computeIfAbsent(spec.getPartitionId(), ignore -> new 
HashMap<>())
+                        .put(spec.getSubpartitionId(), index);
+            }
+        }
+
+        public void notifyAvailableAndPriority(
+                TieredStoragePartitionId partitionId,
+                TieredStorageSubpartitionId subpartitionId,
+                boolean isPriority) {
+            queueChannel(
+                    
channels[channelIndexes.get(partitionId).get(subpartitionId)],
+                    null,
+                    isPriority);
+        }

Review Comment:
   Why don't we need the last priority sequence number anymore?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityAndPriorityNotifier;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentFinishDirPath;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link RemoteStorageScanner} is introduced to notify asynchronously for 
file reading on
+ * remote storage. Asynchronous notifications will prevent {@link 
RemoteTierConsumerAgent} from
+ * repeatedly attempting to read remote files and reduce CPU consumption.
+ *
+ * <p>It will be invoked by {@link RemoteTierConsumerAgent} to watch the 
required segments and scan
+ * the existence status of the segments. If the segment file is found, it will 
notify the
+ * availability of segment file.
+ */
+public class RemoteStorageScanner implements Runnable {
+
+    /** The initial scan interval is 100ms. */
+    private static final int INITIAL_SCAN_INTERVAL_MS = 100;
+
+    /** The max scan interval is 10000ms. */
+    private static final int MAX_SCAN_INTERVAL_MS = 10_000;
+
+    /** Executor to scan the existence status of segment files on remote 
storage. */
+    private final ScheduledExecutorService scannerExecutor =
+            Executors.newSingleThreadScheduledExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("remote storage file scanner")
+                            .build());
+
+    /** The key is partition id and subpartition id, the value is required 
segment id. */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            requiredSegmentIds;
+
+    /**
+     * The key is partition id and subpartition id, the value is max id of 
written segment files in
+     * the subpartition.
+     */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            scannedMaxSegmentIds;
+
+    private final String baseRemoteStoragePath;
+
+    private final ScanStrategy scanStrategy;
+
+    private FileSystem remoteFileSystem;

Review Comment:
   Why is this not `final`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java:
##########
@@ -90,7 +91,12 @@ public TierProducerAgent createProducerAgent(
     public TierConsumerAgent createConsumerAgent(
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
             TieredStorageNettyService nettyService) {
-        // TODO, implement the remote tier consumer agent
-        return null;
+        PartitionFileReader partitionFileReader =
+                SegmentPartitionFile.createPartitionFileReader(
+                        getTieredStoragePath(remoteStorageBasePath));
+        return new RemoteTierConsumerAgent(
+                new 
RemoteStorageScanner(getTieredStoragePath(remoteStorageBasePath)),
+                partitionFileReader,
+                bufferSizeBytes);

Review Comment:
   Minor:
   1. `getTieredStoragePath(remoteStorageBasePath)` can be deduplicated
   2. It might be better to align code-style for `partitionFileReader` and 
`remoteStorageScanner`, i.e. creating them either both inline when creating the 
agent, or both before creating the agent.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityAndPriorityNotifier;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentFinishDirPath;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link RemoteStorageScanner} is introduced to notify asynchronously for 
file reading on
+ * remote storage. Asynchronous notifications will prevent {@link 
RemoteTierConsumerAgent} from
+ * repeatedly attempting to read remote files and reduce CPU consumption.
+ *
+ * <p>It will be invoked by {@link RemoteTierConsumerAgent} to watch the 
required segments and scan
+ * the existence status of the segments. If the segment file is found, it will 
notify the
+ * availability of segment file.
+ */
+public class RemoteStorageScanner implements Runnable {
+
+    /** The initial scan interval is 100ms. */
+    private static final int INITIAL_SCAN_INTERVAL_MS = 100;
+
+    /** The max scan interval is 10000ms. */
+    private static final int MAX_SCAN_INTERVAL_MS = 10_000;
+
+    /** Executor to scan the existence status of segment files on remote 
storage. */
+    private final ScheduledExecutorService scannerExecutor =
+            Executors.newSingleThreadScheduledExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("remote storage file scanner")
+                            .build());
+
+    /** The key is partition id and subpartition id, the value is required 
segment id. */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            requiredSegmentIds;
+
+    /**
+     * The key is partition id and subpartition id, the value is max id of 
written segment files in
+     * the subpartition.
+     */
+    private final Map<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>
+            scannedMaxSegmentIds;
+
+    private final String baseRemoteStoragePath;
+
+    private final ScanStrategy scanStrategy;
+
+    private FileSystem remoteFileSystem;
+
+    private AvailabilityAndPriorityNotifier notifier;
+
+    private int attemptNumber = 0;
+
+    public RemoteStorageScanner(String baseRemoteStoragePath) {
+        this.baseRemoteStoragePath = baseRemoteStoragePath;
+        this.requiredSegmentIds = new ConcurrentHashMap<>();
+        this.scannedMaxSegmentIds = new ConcurrentHashMap<>();
+        this.scanStrategy = new ScanStrategy(INITIAL_SCAN_INTERVAL_MS, 
MAX_SCAN_INTERVAL_MS);
+        try {
+            this.remoteFileSystem = new 
Path(baseRemoteStoragePath).getFileSystem();
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(
+                    e, "Failed to initialize file system on the path: " + 
baseRemoteStoragePath);
+        }
+    }
+
+    /** Start the executor. */
+    public void start() {
+        scannerExecutor.schedule(
+                this, scanStrategy.getInterval(attemptNumber), 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Watch the segment for a specific subpartition in the {@link 
RemoteStorageScanner}.
+     *
+     * <p>If a segment with a larger or equal id already exists, the current 
segment won't be
+     * watched.
+     *
+     * <p>If a segment with a smaller segment id is still being watched, the 
current segment will
+     * replace it because the smaller segment should have been consumed. This 
method ensures that
+     * only one segment file can be watched for each subpartition.
+     *
+     * @param partitionId is the id of partition.
+     * @param subpartitionId is the id of subpartition.
+     * @param segmentId is the id of segment.
+     */
+    public void watchSegment(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            int segmentId) {
+        Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId> key =
+                Tuple2.of(partitionId, subpartitionId);
+        if (scannedMaxSegmentIds.getOrDefault(key, -1) >= segmentId) {
+            return;
+        }
+        requiredSegmentIds.put(key, segmentId);
+    }
+
+    /** Close the executor. */
+    public void close() {
+        scannerExecutor.shutdownNow();
+    }
+
+    /** Iterate the watched segment ids and check related file status. */
+    @Override
+    public void run() {
+        Iterator<Map.Entry<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer>>
+                iterator = requiredSegmentIds.entrySet().iterator();
+        boolean scanned = false;
+        while (iterator.hasNext()) {
+            Map.Entry<Tuple2<TieredStoragePartitionId, 
TieredStorageSubpartitionId>, Integer> ids =
+                    iterator.next();
+            TieredStoragePartitionId partitionId = ids.getKey().f0;
+            TieredStorageSubpartitionId subpartitionId = ids.getKey().f1;
+            int requiredSegmentId = ids.getValue();
+            int maxSegmentId = scannedMaxSegmentIds.getOrDefault(ids.getKey(), 
-1);
+            if (maxSegmentId >= requiredSegmentId) {
+                scanned = true;
+                iterator.remove();
+                notifier.notifyAvailableAndPriority(partitionId, 
subpartitionId, false);

Review Comment:
   This doesn't seem right. A required segment may not exist in remote tier at 
all. `maxSegmentId >= requiredSegmentId` does not necessarily mean the segment 
exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to