xintongsong commented on code in PR #22855: URL: https://github.com/apache/flink/pull/22855#discussion_r1248809827
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSegmentFinishPath; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSubpartitionPath; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link RemoteStorageScanner} is introduced to notify asynchronously for file reading on + * remote storage. Asynchronous notifications will prevent {@link RemoteTierConsumerAgent} from + * repeatedly attempting to read remote files and reduce CPU consumption. + * + * <p>It will be invoked by {@link RemoteTierConsumerAgent} to register the required segment id and + * monitor the existence status of related segment files. If the segment file is found, it will + * trigger the reading process of the file. + */ +public class RemoteStorageScanner implements Runnable { + + /** Executor to scan the existence status of segment files on remote storage. */ + private final ScheduledExecutorService scannerExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("remote storage file scanner") + .build()); + + /** + * Tuple3 containing partition id and subpartition id and segment id. The tuple3 is stored in + * queue and indicates the required segment file. + */ + private final Queue<Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer>> + requiredSegmentIds; + + /** + * The channel indexes stored in map. + * + * <p>The key is partition id and subpartition id. The value is related channel index. + */ + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>> + channelIndexes; + + private final String baseRemoteStoragePath; + + private FileSystem remoteFileSystem; + + private RemoteStorageScannerAvailabilityAndPriorityHelper helper; + + public RemoteStorageScanner( + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + String baseRemoteStoragePath) { + this.baseRemoteStoragePath = baseRemoteStoragePath; + this.channelIndexes = new HashMap<>(); + for (int index = 0; index < tieredStorageConsumerSpecs.size(); index++) { + TieredStorageConsumerSpec spec = tieredStorageConsumerSpecs.get(index); + channelIndexes + .computeIfAbsent(spec.getPartitionId(), ignore -> new HashMap<>()) + .put(spec.getSubpartitionId(), index); + } + this.requiredSegmentIds = new LinkedBlockingDeque<>(); + try { + this.remoteFileSystem = new Path(baseRemoteStoragePath).getFileSystem(); + } catch (IOException e) { + ExceptionUtils.rethrow( + e, "Failed to initialize file system on the path: " + baseRemoteStoragePath); + } + } + + /** Start the executor. */ + public void start() { + scannerExecutor.scheduleAtFixedRate(this, 0, 10, TimeUnit.MILLISECONDS); Review Comment: 1. 10ms might be too aggressive. This would likely overwhelm the remote storage. Moreover, as we already fallback to the remote tier, I don't think it's that critical to discover the segments very timely. 2. We should at least make this constant a static final field. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScannerAvailabilityAndPriorityHelper.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; + +/** + * {@link RemoteStorageScannerAvailabilityAndPriorityHelper} is used to help the {@link + * RemoteStorageScanner} to notify the availability and priority of the reading process for the + * specific partition and subpartition on remote storage. + */ +public interface RemoteStorageScannerAvailabilityAndPriorityHelper { Review Comment: Looks like this is identical to `NettyConnectionReaderAvailabilityAndPriorityHelper`, and even their implementations `SingleInputGate` are identical. They should be deduplicated ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSegmentFinishPath; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSubpartitionPath; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link RemoteStorageScanner} is introduced to notify asynchronously for file reading on + * remote storage. Asynchronous notifications will prevent {@link RemoteTierConsumerAgent} from + * repeatedly attempting to read remote files and reduce CPU consumption. + * + * <p>It will be invoked by {@link RemoteTierConsumerAgent} to register the required segment id and + * monitor the existence status of related segment files. If the segment file is found, it will + * trigger the reading process of the file. + */ +public class RemoteStorageScanner implements Runnable { + + /** Executor to scan the existence status of segment files on remote storage. */ + private final ScheduledExecutorService scannerExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("remote storage file scanner") + .build()); + + /** + * Tuple3 containing partition id and subpartition id and segment id. The tuple3 is stored in + * queue and indicates the required segment file. + */ + private final Queue<Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer>> + requiredSegmentIds; + + /** + * The channel indexes stored in map. + * + * <p>The key is partition id and subpartition id. The value is related channel index. + */ + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>> + channelIndexes; + + private final String baseRemoteStoragePath; + + private FileSystem remoteFileSystem; + + private RemoteStorageScannerAvailabilityAndPriorityHelper helper; + + public RemoteStorageScanner( + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + String baseRemoteStoragePath) { + this.baseRemoteStoragePath = baseRemoteStoragePath; + this.channelIndexes = new HashMap<>(); + for (int index = 0; index < tieredStorageConsumerSpecs.size(); index++) { + TieredStorageConsumerSpec spec = tieredStorageConsumerSpecs.get(index); + channelIndexes + .computeIfAbsent(spec.getPartitionId(), ignore -> new HashMap<>()) + .put(spec.getSubpartitionId(), index); + } + this.requiredSegmentIds = new LinkedBlockingDeque<>(); + try { + this.remoteFileSystem = new Path(baseRemoteStoragePath).getFileSystem(); + } catch (IOException e) { + ExceptionUtils.rethrow( + e, "Failed to initialize file system on the path: " + baseRemoteStoragePath); + } + } + + /** Start the executor. */ + public void start() { + scannerExecutor.scheduleAtFixedRate(this, 0, 10, TimeUnit.MILLISECONDS); + } + + /** + * Register a segment id to the {@link RemoteStorageScanner}. If the scanner discovers the + * segment file exists, it will trigger the reading process of the segment file. + * + * @param partitionId is the id of partition. + * @param subpartitionId is the id of subpartition. + * @param segmentId is the id of segment. + */ + public void registerSegmentId( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + int segmentId) { + requiredSegmentIds.add(Tuple3.of(partitionId, subpartitionId, segmentId)); + } + + /** Close the executor. */ + public void close() { + scannerExecutor.shutdownNow(); + } + + /** Iterate the registered segment ids and check related file status. */ + @Override + public void run() { + int segmentFileNumber = requiredSegmentIds.size(); + while (segmentFileNumber-- > 0) { + Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer> + partitionIdAndSubpartitionIdAndSegmentId = + checkNotNull(requiredSegmentIds.poll()); + if (checkFileExist( + partitionIdAndSubpartitionIdAndSegmentId.f0, + partitionIdAndSubpartitionIdAndSegmentId.f1, + partitionIdAndSubpartitionIdAndSegmentId.f2)) { + helper.notifyAvailableAndPriority( + channelIndexes + .get(partitionIdAndSubpartitionIdAndSegmentId.f0) + .get(partitionIdAndSubpartitionIdAndSegmentId.f1), + false); + } else { + requiredSegmentIds.add(partitionIdAndSubpartitionIdAndSegmentId); + } + } + } + + public void setupRemoteStorageScannerAvailabilityAndPriorityHelper( + RemoteStorageScannerAvailabilityAndPriorityHelper helper) { + this.helper = helper; + } + + public void triggerNextRoundReading( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + boolean isPriority) { + helper.notifyAvailableAndPriority( + channelIndexes.get(partitionId).get(subpartitionId), isPriority); + } + + public void updatePrioritySequenceNumber( Review Comment: Same here. Why updating the priority and sequence number through the scanner? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSegmentFinishPath; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSubpartitionPath; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link RemoteStorageScanner} is introduced to notify asynchronously for file reading on + * remote storage. Asynchronous notifications will prevent {@link RemoteTierConsumerAgent} from + * repeatedly attempting to read remote files and reduce CPU consumption. + * + * <p>It will be invoked by {@link RemoteTierConsumerAgent} to register the required segment id and + * monitor the existence status of related segment files. If the segment file is found, it will + * trigger the reading process of the file. + */ +public class RemoteStorageScanner implements Runnable { + + /** Executor to scan the existence status of segment files on remote storage. */ + private final ScheduledExecutorService scannerExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("remote storage file scanner") + .build()); + + /** + * Tuple3 containing partition id and subpartition id and segment id. The tuple3 is stored in + * queue and indicates the required segment file. + */ + private final Queue<Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer>> + requiredSegmentIds; + + /** + * The channel indexes stored in map. + * + * <p>The key is partition id and subpartition id. The value is related channel index. + */ + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>> + channelIndexes; + + private final String baseRemoteStoragePath; + + private FileSystem remoteFileSystem; + + private RemoteStorageScannerAvailabilityAndPriorityHelper helper; + + public RemoteStorageScanner( + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + String baseRemoteStoragePath) { + this.baseRemoteStoragePath = baseRemoteStoragePath; + this.channelIndexes = new HashMap<>(); + for (int index = 0; index < tieredStorageConsumerSpecs.size(); index++) { + TieredStorageConsumerSpec spec = tieredStorageConsumerSpecs.get(index); + channelIndexes + .computeIfAbsent(spec.getPartitionId(), ignore -> new HashMap<>()) + .put(spec.getSubpartitionId(), index); + } + this.requiredSegmentIds = new LinkedBlockingDeque<>(); + try { + this.remoteFileSystem = new Path(baseRemoteStoragePath).getFileSystem(); + } catch (IOException e) { + ExceptionUtils.rethrow( + e, "Failed to initialize file system on the path: " + baseRemoteStoragePath); + } + } + + /** Start the executor. */ + public void start() { + scannerExecutor.scheduleAtFixedRate(this, 0, 10, TimeUnit.MILLISECONDS); + } + + /** + * Register a segment id to the {@link RemoteStorageScanner}. If the scanner discovers the + * segment file exists, it will trigger the reading process of the segment file. + * + * @param partitionId is the id of partition. + * @param subpartitionId is the id of subpartition. + * @param segmentId is the id of segment. + */ + public void registerSegmentId( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + int segmentId) { + requiredSegmentIds.add(Tuple3.of(partitionId, subpartitionId, segmentId)); + } + + /** Close the executor. */ + public void close() { + scannerExecutor.shutdownNow(); + } + + /** Iterate the registered segment ids and check related file status. */ + @Override + public void run() { + int segmentFileNumber = requiredSegmentIds.size(); + while (segmentFileNumber-- > 0) { + Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer> + partitionIdAndSubpartitionIdAndSegmentId = + checkNotNull(requiredSegmentIds.poll()); + if (checkFileExist( + partitionIdAndSubpartitionIdAndSegmentId.f0, + partitionIdAndSubpartitionIdAndSegmentId.f1, + partitionIdAndSubpartitionIdAndSegmentId.f2)) { + helper.notifyAvailableAndPriority( + channelIndexes + .get(partitionIdAndSubpartitionIdAndSegmentId.f0) + .get(partitionIdAndSubpartitionIdAndSegmentId.f1), + false); + } else { + requiredSegmentIds.add(partitionIdAndSubpartitionIdAndSegmentId); Review Comment: What if a segment is no longer needed? How do we remove it? E.g., the consumer doesn't know which tier a segment is in and asks for each tier, and later obtained the segment from another tier. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java: ########## @@ -75,7 +75,9 @@ public TierProducerAgent createProducerAgent( @Override public TierConsumerAgent createConsumerAgent( List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, - TieredStorageNettyService nettyService) { + TieredStorageNettyService nettyService, + RemoteStorageScanner remoteStorageScanner, + PartitionFileReader partitionFileReader) { // TODO, implement the remote tier consumer agent Review Comment: Shouldn't we be able to create the agent now? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSegmentFinishPath; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSubpartitionPath; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link RemoteStorageScanner} is introduced to notify asynchronously for file reading on + * remote storage. Asynchronous notifications will prevent {@link RemoteTierConsumerAgent} from + * repeatedly attempting to read remote files and reduce CPU consumption. + * + * <p>It will be invoked by {@link RemoteTierConsumerAgent} to register the required segment id and + * monitor the existence status of related segment files. If the segment file is found, it will + * trigger the reading process of the file. + */ +public class RemoteStorageScanner implements Runnable { + + /** Executor to scan the existence status of segment files on remote storage. */ + private final ScheduledExecutorService scannerExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("remote storage file scanner") + .build()); + + /** + * Tuple3 containing partition id and subpartition id and segment id. The tuple3 is stored in + * queue and indicates the required segment file. + */ + private final Queue<Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer>> + requiredSegmentIds; + + /** + * The channel indexes stored in map. + * + * <p>The key is partition id and subpartition id. The value is related channel index. + */ + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>> + channelIndexes; + + private final String baseRemoteStoragePath; + + private FileSystem remoteFileSystem; + + private RemoteStorageScannerAvailabilityAndPriorityHelper helper; + + public RemoteStorageScanner( + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + String baseRemoteStoragePath) { + this.baseRemoteStoragePath = baseRemoteStoragePath; + this.channelIndexes = new HashMap<>(); + for (int index = 0; index < tieredStorageConsumerSpecs.size(); index++) { + TieredStorageConsumerSpec spec = tieredStorageConsumerSpecs.get(index); + channelIndexes + .computeIfAbsent(spec.getPartitionId(), ignore -> new HashMap<>()) + .put(spec.getSubpartitionId(), index); + } + this.requiredSegmentIds = new LinkedBlockingDeque<>(); + try { + this.remoteFileSystem = new Path(baseRemoteStoragePath).getFileSystem(); + } catch (IOException e) { + ExceptionUtils.rethrow( + e, "Failed to initialize file system on the path: " + baseRemoteStoragePath); + } + } + + /** Start the executor. */ + public void start() { + scannerExecutor.scheduleAtFixedRate(this, 0, 10, TimeUnit.MILLISECONDS); + } + + /** + * Register a segment id to the {@link RemoteStorageScanner}. If the scanner discovers the + * segment file exists, it will trigger the reading process of the segment file. + * + * @param partitionId is the id of partition. + * @param subpartitionId is the id of subpartition. + * @param segmentId is the id of segment. + */ + public void registerSegmentId( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + int segmentId) { + requiredSegmentIds.add(Tuple3.of(partitionId, subpartitionId, segmentId)); + } + + /** Close the executor. */ + public void close() { + scannerExecutor.shutdownNow(); + } + + /** Iterate the registered segment ids and check related file status. */ + @Override + public void run() { + int segmentFileNumber = requiredSegmentIds.size(); + while (segmentFileNumber-- > 0) { + Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer> + partitionIdAndSubpartitionIdAndSegmentId = + checkNotNull(requiredSegmentIds.poll()); + if (checkFileExist( Review Comment: Instead of checking the existence of individual segment files separately, I wonder if it makes sense to list all files at once and check for each segment. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/HashPartitionFileReader.java: ########## @@ -40,8 +83,43 @@ public Buffer readBuffer( MemorySegment memorySegment, BufferRecycler recycler) throws IOException { - // TODO, implement the HashPartitionFileReader - return null; + Tuple2<ReadableByteChannel, Integer> fileChannelAndSegmentId = + openedChannelAndSegmentIds + .computeIfAbsent(partitionId, ignore -> new HashMap<>()) + .getOrDefault(subpartitionId, Tuple2.of(null, -1)); + ReadableByteChannel channel = fileChannelAndSegmentId.f0; + if (channel == null || fileChannelAndSegmentId.f1 != segmentId) { + if (channel != null) { + channel.close(); + } + channel = openNewChannel(partitionId, subpartitionId, segmentId); + if (channel == null) { + return null; + } + openedChannelAndSegmentIds + .get(partitionId) Review Comment: This `get` is unnecessary if we save `openedChannelAndSegmentIds.computeIfAbsent(...)` in a variable. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSegmentFinishPath; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateSubpartitionPath; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link RemoteStorageScanner} is introduced to notify asynchronously for file reading on + * remote storage. Asynchronous notifications will prevent {@link RemoteTierConsumerAgent} from + * repeatedly attempting to read remote files and reduce CPU consumption. + * + * <p>It will be invoked by {@link RemoteTierConsumerAgent} to register the required segment id and + * monitor the existence status of related segment files. If the segment file is found, it will + * trigger the reading process of the file. + */ +public class RemoteStorageScanner implements Runnable { + + /** Executor to scan the existence status of segment files on remote storage. */ + private final ScheduledExecutorService scannerExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("remote storage file scanner") + .build()); + + /** + * Tuple3 containing partition id and subpartition id and segment id. The tuple3 is stored in + * queue and indicates the required segment file. + */ + private final Queue<Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer>> + requiredSegmentIds; + + /** + * The channel indexes stored in map. + * + * <p>The key is partition id and subpartition id. The value is related channel index. + */ + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>> + channelIndexes; + + private final String baseRemoteStoragePath; + + private FileSystem remoteFileSystem; + + private RemoteStorageScannerAvailabilityAndPriorityHelper helper; + + public RemoteStorageScanner( + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + String baseRemoteStoragePath) { + this.baseRemoteStoragePath = baseRemoteStoragePath; + this.channelIndexes = new HashMap<>(); + for (int index = 0; index < tieredStorageConsumerSpecs.size(); index++) { + TieredStorageConsumerSpec spec = tieredStorageConsumerSpecs.get(index); + channelIndexes + .computeIfAbsent(spec.getPartitionId(), ignore -> new HashMap<>()) + .put(spec.getSubpartitionId(), index); + } + this.requiredSegmentIds = new LinkedBlockingDeque<>(); + try { + this.remoteFileSystem = new Path(baseRemoteStoragePath).getFileSystem(); + } catch (IOException e) { + ExceptionUtils.rethrow( + e, "Failed to initialize file system on the path: " + baseRemoteStoragePath); + } + } + + /** Start the executor. */ + public void start() { + scannerExecutor.scheduleAtFixedRate(this, 0, 10, TimeUnit.MILLISECONDS); + } + + /** + * Register a segment id to the {@link RemoteStorageScanner}. If the scanner discovers the + * segment file exists, it will trigger the reading process of the segment file. + * + * @param partitionId is the id of partition. + * @param subpartitionId is the id of subpartition. + * @param segmentId is the id of segment. + */ + public void registerSegmentId( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + int segmentId) { + requiredSegmentIds.add(Tuple3.of(partitionId, subpartitionId, segmentId)); + } + + /** Close the executor. */ + public void close() { + scannerExecutor.shutdownNow(); + } + + /** Iterate the registered segment ids and check related file status. */ + @Override + public void run() { + int segmentFileNumber = requiredSegmentIds.size(); + while (segmentFileNumber-- > 0) { + Tuple3<TieredStoragePartitionId, TieredStorageSubpartitionId, Integer> + partitionIdAndSubpartitionIdAndSegmentId = + checkNotNull(requiredSegmentIds.poll()); + if (checkFileExist( + partitionIdAndSubpartitionIdAndSegmentId.f0, + partitionIdAndSubpartitionIdAndSegmentId.f1, + partitionIdAndSubpartitionIdAndSegmentId.f2)) { + helper.notifyAvailableAndPriority( + channelIndexes + .get(partitionIdAndSubpartitionIdAndSegmentId.f0) + .get(partitionIdAndSubpartitionIdAndSegmentId.f1), + false); + } else { + requiredSegmentIds.add(partitionIdAndSubpartitionIdAndSegmentId); + } + } + } + + public void setupRemoteStorageScannerAvailabilityAndPriorityHelper( + RemoteStorageScannerAvailabilityAndPriorityHelper helper) { + this.helper = helper; + } + + public void triggerNextRoundReading( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + boolean isPriority) { + helper.notifyAvailableAndPriority( + channelIndexes.get(partitionId).get(subpartitionId), isPriority); + } Review Comment: It's unclear to me what this method does. Why would we triggering reading through a scanner? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
