xintongsong commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1220671319


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyProducerService} is used as the callback to register {@link 
NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyProducerService {

Review Comment:
   ```suggestion
   public interface NettyServiceProducer {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyProducerService} is used as the callback to register {@link 
NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyProducerService {
+
+    /**
+     * Register a {@link NettyConnectionWriter} for a subpartition.
+     *
+     * @param subpartitionId subpartition id indicates the id of subpartition.
+     * @param nettyConnectionWriter writer is used to write buffers to netty 
connection.
+     */
+    void registerNettyConnectionWriter(

Review Comment:
   ```suggestion
       void connectionEstablished(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyProducerService} is used as the callback to register {@link 
NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyProducerService {
+
+    /**
+     * Register a {@link NettyConnectionWriter} for a subpartition.
+     *
+     * @param subpartitionId subpartition id indicates the id of subpartition.
+     * @param nettyConnectionWriter writer is used to write buffers to netty 
connection.
+     */
+    void registerNettyConnectionWriter(
+            TieredStorageSubpartitionId subpartitionId,
+            NettyConnectionWriter nettyConnectionWriter);
+
+    /**
+     * Disconnect the netty connection related to the {@link 
NettyConnectionId}.
+     *
+     * @param connectionId connection id is the id of connection.
+     */
+    void disconnectNettyConnection(NettyConnectionId connectionId);

Review Comment:
   ```suggestion
       void connectionBroken(NettyConnectionId connectionId);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to 
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition 
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be 
non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and 
bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;

Review Comment:
   What happens to the `@Nullable`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to 
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition 
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be 
non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and 
bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;
+
+    // If the bufferIndex is non-negative, buffer won't be null, error will be 
null, subpartitionId
+    // will be non-negative, segmentId will be -1.
+    private int bufferIndex = -1;
+
+    // If the subpartitionId is non-negative, buffer won't be null, error will 
be null, bufferIndex
+    // will be non-negative, segmentId will be -1.
+    private int subpartitionId = -1;
+
+    // If the segmentId is non-negative, buffer and error be null, bufferIndex 
and subpartitionId
+    // will be -1.
+    private int segmentId = -1;
+
+    public NettyPayload(Buffer buffer, int bufferIndex, int subpartitionId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+    }
+
+    public NettyPayload(Throwable error) {
+        this.error = error;
+    }
+
+    public NettyPayload(int segmentId) {
+        this.segmentId = segmentId;
+    }

Review Comment:
   Should check the arguments.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to 
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition 
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be 
non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and 
bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;
+
+    // If the bufferIndex is non-negative, buffer won't be null, error will be 
null, subpartitionId
+    // will be non-negative, segmentId will be -1.
+    private int bufferIndex = -1;
+
+    // If the subpartitionId is non-negative, buffer won't be null, error will 
be null, bufferIndex
+    // will be non-negative, segmentId will be -1.
+    private int subpartitionId = -1;
+
+    // If the segmentId is non-negative, buffer and error be null, bufferIndex 
and subpartitionId
+    // will be -1.
+    private int segmentId = -1;
+
+    public NettyPayload(Buffer buffer, int bufferIndex, int subpartitionId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+    }
+
+    public NettyPayload(Throwable error) {
+        this.error = error;
+    }
+
+    public NettyPayload(int segmentId) {
+        this.segmentId = segmentId;
+    }

Review Comment:
   I'd suggest to use factories rather than constructors, so that they can have 
meaningful names.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write 
buffers to netty
+ * connection.
+ */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer to netty connection.
+     *
+     * @param nettyPayload buffer context represents the buffer.
+     */
+    void writeBuffer(NettyPayload nettyPayload);
+
+    /**
+     * Get the id of connection in the writer.
+     *
+     * @return the id of connection.
+     */
+    NettyConnectionId getNettyConnectionId();
+
+    /**
+     * Get the number of existed buffers in the writer. The {@link 
NettyConnectionWriter} may be
+     * implemented based on a queue structure, this method is used to get the 
residual buffers in
+     * the writer.
+     *
+     * @return the buffer number.
+     */
+    int numQueuedBuffers();

Review Comment:
   ```
   The {@link NettyConnectionWriter} may be implemented based on a queue 
structure
   ```
   1. This belongs to the class javadoc.
   2. What is this queue?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderImpl.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** The default implementation of {@link NettyConnectionReader}. */
+public class NettyConnectionReaderImpl implements NettyConnectionReader {
+
+    /** subpartitionId is used to indicate the id of subpartition. */
+    private final int subpartitionId;
+
+    /**
+     * inputChannels is initialized in {@link SingleInputGate} and used to 
read buffer from netty
+     * connection.
+     */
+    private final InputChannel[] inputChannels;
+
+    /**
+     * queueChannelCallback is used to queue a channel to {@link 
SingleInputGate} to trigger next
+     * round of reading from this channel. The Integer in the return type is 
the index of input
+     * channel and the Boolean in the return type is whether to queue the 
channel with priority.
+     */
+    private final BiConsumer<Integer, Boolean> queueChannelCallback;
+
+    /**
+     * The array is used to record the latest sequence number of buffer with 
priority data type,
+     * which can decide outdated status of sequence number and whether to 
enqueue the related input
+     * channel to {@link SingleInputGate}.
+     */
+    private final int[] lastPrioritySequenceNumber;

Review Comment:
   1. `queueChannelCallback` -> `availabilityAndPriorityNotifier`.
   2. Should expose `lastPrioritySequenceNumber` via interfaces, rather than 
internal fields shared by two classes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionId.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import java.util.Objects;
+
+/** {@link NettyConnectionId} indicates the unique id of netty connection. */
+public class NettyConnectionId {
+
+    // The default id of netty connection.
+    private static int defaultId = 0;

Review Comment:
   1. Should not be named with "default".
   2. The static field can be concurrently accessed, and would have consistency 
issues.
   
   I'd suggest to use a large random value, or partitionId + subpartitionId + a 
small random value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderImpl.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** The default implementation of {@link NettyConnectionReader}. */
+public class NettyConnectionReaderImpl implements NettyConnectionReader {
+
+    /** subpartitionId is used to indicate the id of subpartition. */
+    private final int subpartitionId;
+
+    /**
+     * inputChannels is initialized in {@link SingleInputGate} and used to 
read buffer from netty
+     * connection.
+     */
+    private final InputChannel[] inputChannels;
+
+    /**
+     * queueChannelCallback is used to queue a channel to {@link 
SingleInputGate} to trigger next
+     * round of reading from this channel. The Integer in the return type is 
the index of input
+     * channel and the Boolean in the return type is whether to queue the 
channel with priority.
+     */
+    private final BiConsumer<Integer, Boolean> queueChannelCallback;
+
+    /**
+     * The array is used to record the latest sequence number of buffer with 
priority data type,
+     * which can decide outdated status of sequence number and whether to 
enqueue the related input
+     * channel to {@link SingleInputGate}.
+     */
+    private final int[] lastPrioritySequenceNumber;
+
+    /** The last required segment id. */
+    private int lastRequiredSegmentId = 0;
+
+    public NettyConnectionReaderImpl(
+            int subpartitionId,
+            InputChannel[] inputChannels,
+            BiConsumer<Integer, Boolean> queueChannelCallback,
+            int[] lastPrioritySequenceNumber) {
+        this.subpartitionId = subpartitionId;
+        this.inputChannels = inputChannels;
+        this.queueChannelCallback = queueChannelCallback;
+        this.lastPrioritySequenceNumber = lastPrioritySequenceNumber;
+    }
+
+    @Override
+    public Optional<Buffer> readBuffer(int segmentId) {
+        if (segmentId > 0L && (segmentId != lastRequiredSegmentId)) {
+            lastRequiredSegmentId = segmentId;
+            inputChannels[subpartitionId].notifyRequiredSegmentId(segmentId);
+        }
+        Optional<InputChannel.BufferAndAvailability> bufferAndAvailability = 
Optional.empty();
+        try {
+            bufferAndAvailability = 
inputChannels[subpartitionId].getNextBuffer();

Review Comment:
   This doesn't seems right.
   - Number of channels corresponds to the upstream parallelism, while 
subpartitionId corresponds to the downstream parallelism. We cannot use 
subpartitionId to index an input channel.
   - If we only need to access `inputChannels[subpartitionId]` in this class, 
why would we need the entire array?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements 
TieredStorageNettyService {
+
+    private final Map<TieredStoragePartitionId, List<NettyProducerService>>
+            registeredProducerServices = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    private final Map<TieredStoragePartitionId, 
Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();
+
+    private final Map<TieredStoragePartitionId, 
Map<TieredStorageSubpartitionId, InputChannel[]>>
+            registeredInputChannels = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, BiConsumer<Integer, 
Boolean>>>
+            registeredQueueChannelCallbacks = new ConcurrentHashMap<>();
+
+    private final Map<TieredStoragePartitionId, 
Map<TieredStorageSubpartitionId, int[]>>
+            registeredPriorityArrays = new ConcurrentHashMap<>();

Review Comment:
   Might be better to group them by producer and consumer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to 
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition 
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be 
non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and 
bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;
+
+    // If the bufferIndex is non-negative, buffer won't be null, error will be 
null, subpartitionId
+    // will be non-negative, segmentId will be -1.
+    private int bufferIndex = -1;
+
+    // If the subpartitionId is non-negative, buffer won't be null, error will 
be null, bufferIndex
+    // will be non-negative, segmentId will be -1.
+    private int subpartitionId = -1;

Review Comment:
   Use JavaDoc.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to 
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition 
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {

Review Comment:
   Take a look at `BufferIndexOrError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to