xintongsong commented on code in PR #22855:
URL: https://github.com/apache/flink/pull/22855#discussion_r1259375162
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -1146,21 +1151,35 @@ private void setupTieredStorageNettyService(
int channelIndex = index;
channelSuppliers.add(() -> channels[channelIndex]);
}
- nettyService.setupInputChannels(
- tieredStorageConsumerSpecs,
- channelSuppliers,
- new NettyConnectionReaderAvailabilityAndPriorityHelper() {
- @Override
- public void notifyReaderAvailableAndPriority(
- int channelIndex, boolean isPriority) {
- queueChannel(channels[channelIndex], null, isPriority);
- }
+ nettyService.setupInputChannels(tieredStorageConsumerSpecs,
channelSuppliers);
+ }
- @Override
- public void updatePrioritySequenceNumber(int channelIndex,
int sequenceNumber) {
- lastPrioritySequenceNumber[channelIndex] =
sequenceNumber;
- }
- });
+ /** The default implementation of {@link AvailabilityAndPriorityNotifier}.
*/
+ private class AvailabilityAndPriorityNotifierImpl implements
AvailabilityAndPriorityNotifier {
+
+ private final Map<TieredStoragePartitionId,
Map<TieredStorageSubpartitionId, Integer>>
+ channelIndexes;
+
+ public AvailabilityAndPriorityNotifierImpl() {
Review Comment:
Should be private.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/AvailabilityAndPriorityNotifier.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+/**
+ * {@link AvailabilityAndPriorityNotifier} is used to notify the availability
and priority status of
+ * a specific partition and subpartition in tiered storage.
+ */
+public interface AvailabilityAndPriorityNotifier {
+
+ /**
+ * Notify the availability and priority status of a specific partition and
subpartition. This
+ * method will be invoked by {@link TieredStorageConsumerClient} if the
subpartition in tiered
+ * storage has more available buffers or should be read with priority.
+ *
+ * @param partitionId the partition id.
+ * @param subpartitionId the subpartition id.
+ * @param isPriority the subpartition will be consumed with priority if
the value is true
+ * otherwise not.
+ * @param prioritySequenceNumber the sequence number of priority buffer.
+ */
+ void notifyAvailableAndPriority(
+ TieredStoragePartitionId partitionId,
+ TieredStorageSubpartitionId subpartitionId,
+ boolean isPriority,
+ Integer prioritySequenceNumber);
Review Comment:
1. I'd suggest the name `priorityBufferIndex`, to align the terminology.
2. Why `Integer`, rather than `int`?
3. What is this value when `isPriority` is `false`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java:
##########
@@ -34,14 +39,20 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/** The data client is used to fetch data from disk tier. */
public class DiskTierConsumerAgent implements TierConsumerAgent {
private final Map<
TieredStoragePartitionId,
- Map<TieredStorageSubpartitionId,
CompletableFuture<NettyConnectionReader>>>
+ Map<
+ TieredStorageSubpartitionId,
+ Tuple2<CompletableFuture<NettyConnectionReader>,
Integer>>>
nettyConnectionReaders = new HashMap<>();
+ @Nullable private AvailabilityAndPriorityNotifier notifier;
Review Comment:
What does the null-value mean?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java:
##########
@@ -62,20 +74,36 @@ public void start() {
// noop
}
+ @Override
+ public void
registerAvailabilityAndPriorityNotifier(AvailabilityAndPriorityNotifier
notifier) {
+ this.notifier = notifier;
+ }
+
@Override
public Optional<Buffer> getNextBuffer(
TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId,
int segmentId) {
+ Optional<Buffer> buffer = Optional.empty();
+ Tuple2<CompletableFuture<NettyConnectionReader>, Integer>
readerAndBufferIndex =
+ nettyConnectionReaders.get(partitionId).get(subpartitionId);
try {
- return nettyConnectionReaders
- .get(partitionId)
- .get(subpartitionId)
- .get()
- .readBuffer(segmentId);
+ buffer = readerAndBufferIndex.f0.get().readBuffer(segmentId);
} catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Failed to get next buffer.", e);
+ ExceptionUtils.rethrow(e, "Failed to read buffer from memory
tier.");
}
+ buffer.ifPresent(
+ value -> {
+ boolean isPriority = value.getDataType().hasPriority();
+ checkNotNull(notifier)
+ .notifyAvailableAndPriority(
+ partitionId,
+ subpartitionId,
+ isPriority,
+ isPriority ? readerAndBufferIndex.f1 :
null);
+ readerAndBufferIndex.f1 += 1;
Review Comment:
How the three tiers use the notifier seems quite similar. I wonder why this
is inside each tier? Is it simply because the consumer client doesn't know the
buffer index? If true, we may consider to include buffer index as part of the
return value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]