reswqa commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1222589878


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -311,6 +311,16 @@ public long unsynchronizedGetSizeOfQueuedBuffers() {
         return 0;
     }
 
+    /**
+     * Notify the upstream the id of required segment that should be sent to 
netty connection.

Review Comment:
   This seems like a sick sentence 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderAvailabilityAndPriorityHelper.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+/**
+ * {@link NettyConnectionReaderAvailabilityAndPriorityHelper} is used to help 
the reader notify the
+ * available and priority status of {@link NettyConnectionReader}, and update 
the priority sequence

Review Comment:
   ```suggestion
    * availability and priority status of {@link NettyConnectionReader}, and 
update the priority sequence
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.util.Optional;
+
+/** {@link NettyConnectionReader} is used by {@link TierConsumerAgent} to read 
buffer from netty. */
+public interface NettyConnectionReader {
+    /**
+     * Read a buffer from netty connection.
+     *
+     * @param segmentId segment id indicates the id of segment.
+     * @return Optional.empty() will be returned if there is no buffer sent 
from netty connection

Review Comment:
   ```suggestion
        * @return {@link Optional#empty()} will be returned if there is no 
buffer sent from netty connection
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in 
tiered storage shuffle
+ * mode.
+ */
+@Internal
+public class EndOfSegmentEvent extends RuntimeEvent {
+
+    /** The singleton instance of this event. */
+    public static final EndOfSegmentEvent INSTANCE = new EndOfSegmentEvent();

Review Comment:
   We should disable(private) the default `ctr` for this class if it should be 
a singlton.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write 
buffers to netty
+ * connection. Buffers in the writer will be written to a queue structure and 
netty server will send
+ * buffers from it.
+ */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer to netty connection.
+     *
+     * @param nettyPayload netty payload represents the buffer.

Review Comment:
   If it represents the buffer, why not directly use `Buffer` here? I'd prefer 
change this description to `the payload send to netty connection`.
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterTest {
+
+    private static final int SUBPARTITION_ID = 0;
+
+    @Test
+    void testWriteBuffer() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        assertThat(nettyPayloadQueue.size()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testGetNettyConnectionId() {
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
+    }
+
+    @Test
+    void testNumQueuedBuffers() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        
assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testClose() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        nettyConnectionWriter.close();
+        assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(0);
+    }
+
+    private void writeBufferTpWriter(

Review Comment:
   ```suggestion
       private void writeBufferToWriter(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+    private final BiConsumer<TieredStorageSubpartitionId, 
NettyConnectionWriter>
+            connectionEstablishConsumer;

Review Comment:
   ```suggestion
               connectionEstablishedConsumer;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceProducer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyServiceProducer} is used as the callback to register {@link 
NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyServiceProducer {
+
+    /**
+     * Establish a netty connection for a subpartition.

Review Comment:
   This java doc a bit confusing, It looks like this method will proactively 
establish a connection, but in reality it is just a callback.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+    private final BiConsumer<TieredStorageSubpartitionId, 
NettyConnectionWriter>
+            connectionEstablishConsumer;
+
+    private final Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+    public TestingNettyServiceProducer(
+            BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+                    connectionEstablishConsumer,
+            Consumer<NettyConnectionId> connectionBrokenConsumer) {
+        this.connectionEstablishConsumer = connectionEstablishConsumer;
+        this.connectionBrokenConsumer = connectionBrokenConsumer;
+    }
+
+    @Override
+    public void connectionEstablished(
+            TieredStorageSubpartitionId subpartitionId,
+            NettyConnectionWriter nettyConnectionWriter) {
+        connectionEstablishConsumer.accept(subpartitionId, 
nettyConnectionWriter);
+    }
+
+    @Override
+    public void connectionBroken(NettyConnectionId connectionId) {
+        connectionBrokenConsumer.accept(connectionId);
+    }
+
+    /** Builder for {@link TestingNettyServiceProducer}. */
+    public static class Builder {
+
+        private BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+                connectionEstablishConsumer;
+
+        private Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+        public Builder() {}

Review Comment:
   Why we need this `ctr` instead of the default one compiler generated.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();

Review Comment:
   ```suggestion
           assertThat(nettyPayload.getBuffer()).isPresent();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {

Review Comment:
   ```suggestion
       void testGetError() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);
+    }
+
+    @Test
+    void getBufferIndex() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBufferIndex()).isEqualTo(bufferIndex);
+    }
+
+    @Test
+    void getSubpartitionId() {

Review Comment:
   ```suggestion
       void testGetSubpartitionId() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);
+    }
+
+    @Test
+    void getBufferIndex() {

Review Comment:
   ```suggestion
       void testGetBufferIndex() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);
+    }
+
+    @Test
+    void getBufferIndex() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBufferIndex()).isEqualTo(bufferIndex);
+    }
+
+    @Test
+    void getSubpartitionId() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getSubpartitionId()).isEqualTo(subpartitionId);
+    }
+
+    @Test
+    void getSegmentId() {

Review Comment:
   ```suggestion
       void testGetSegmentId() {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to 
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition 
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    /**
+     * The data buffer. If the buffer is not null, bufferIndex and 
subpartitionId will be
+     * non-negative, error will be null, segmentId will be -1;
+     */
+    @Nullable private final Buffer buffer;
+
+    /**
+     * The error information. If the error is not null, buffer will be null, 
segmentId and
+     * bufferIndex and subpartitionId will be -1.
+     */
+    @Nullable private final Throwable error;
+
+    /**
+     * The index of buffer. If the buffer index is non-negative, buffer won't 
be null, error will be
+     * null, subpartitionId will be non-negative, segmentId will be -1.
+     */
+    private final int bufferIndex;
+
+    /**
+     * The id of subpartition. If the subpartition id is non-negative, buffer 
won't be null, error
+     * will be null, bufferIndex will be non-negative, segmentId will be -1.
+     */
+    private final int subpartitionId;
+
+    /**
+     * The id of segment. If the segment id is non-negative, buffer and error 
be null, bufferIndex

Review Comment:
   ```suggestion
        * The id of segment. If the segment id is non-negative, buffer and 
error will be null, bufferIndex
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to 
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition 
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    /**
+     * The data buffer. If the buffer is not null, bufferIndex and 
subpartitionId will be
+     * non-negative, error will be null, segmentId will be -1;
+     */
+    @Nullable private final Buffer buffer;
+
+    /**
+     * The error information. If the error is not null, buffer will be null, 
segmentId and
+     * bufferIndex and subpartitionId will be -1.
+     */
+    @Nullable private final Throwable error;
+
+    /**
+     * The index of buffer. If the buffer index is non-negative, buffer won't 
be null, error will be
+     * null, subpartitionId will be non-negative, segmentId will be -1.
+     */
+    private final int bufferIndex;
+
+    /**
+     * The id of subpartition. If the subpartition id is non-negative, buffer 
won't be null, error
+     * will be null, bufferIndex will be non-negative, segmentId will be -1.
+     */
+    private final int subpartitionId;
+
+    /**
+     * The id of segment. If the segment id is non-negative, buffer and error 
be null, bufferIndex
+     * and subpartitionId will be -1.
+     */
+    private final int segmentId;
+
+    private NettyPayload(
+            @Nullable Buffer buffer,
+            int bufferIndex,
+            int subpartitionId,
+            @Nullable Throwable error,
+            int segmentId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+        this.error = error;
+        this.segmentId = segmentId;
+    }
+
+    public static NettyPayload newBuffer(Buffer buffer, int bufferIndex, int 
subpartitionId) {
+        checkState(buffer != null && bufferIndex != -1 && subpartitionId != 
-1);
+        return new NettyPayload(buffer, bufferIndex, subpartitionId, null, -1);
+    }
+
+    public static NettyPayload newError(Throwable error) {
+        checkState(error != null);
+        return new NettyPayload(null, -1, -1, error, -1);

Review Comment:
   ```suggestion
           return new NettyPayload(null, -1, -1, checkNotNull(error), -1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, 
InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = 
prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, 
InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                
availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {
+                inputChannel.read(
+                        new NetworkBuffer(
+                                
MemorySegmentFactory.allocateUnpooledSegment(0),
+                                BufferRecycler.DummyBufferRecycler.INSTANCE,
+                                priority ? PRIORITIZED_EVENT_BUFFER : 
DATA_BUFFER),
+                        NONE);
+            } else {
+                for (int index = 0; index < bufferNumber; ++index) {
+                    inputChannel.read(
+                            new NetworkBuffer(
+                                    
MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    
BufferRecycler.DummyBufferRecycler.INSTANCE,
+                                    priority ? PRIORITIZED_EVENT_BUFFER : 
DATA_BUFFER));
+                }
+            }
+        } catch (IOException | InterruptedException e) {
+            ExceptionUtils.rethrow(e, "Failed to create test input channel.");
+        }
+        return () -> inputChannel;
+    }
+
+    private NettyConnectionReader createNettyConnectionReader(

Review Comment:
   Can be static.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.Queue;
+
+/** The default implementation of {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterImpl implements NettyConnectionWriter {
+
+    private final Queue<NettyPayload> bufferQueue;
+
+    private final NettyConnectionId connectionId;
+
+    public NettyConnectionWriterImpl(Queue<NettyPayload> bufferQueue) {
+        this.bufferQueue = bufferQueue;
+        this.connectionId = NettyConnectionId.newId();
+    }
+
+    @Override
+    public NettyConnectionId getNettyConnectionId() {
+        return connectionId;
+    }
+
+    @Override
+    public int numQueuedBuffers() {
+        return bufferQueue.size();
+    }
+
+    @Override
+    public void writeBuffer(NettyPayload nettyPayload) {
+        bufferQueue.add(nettyPayload);
+    }
+
+    @Override
+    public void close() {
+        NettyPayload nettyPayload;
+        while ((nettyPayload = bufferQueue.poll()) != null) {

Review Comment:
   I wonder can we ensure that `bufferQueue ` do not have `Throwable` type 
payload during close it. If not, what will happen to this case?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write 
buffers to netty
+ * connection. Buffers in the writer will be written to a queue structure and 
netty server will send
+ * buffers from it.
+ */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer to netty connection.
+     *
+     * @param nettyPayload netty payload represents the buffer.
+     */
+    void writeBuffer(NettyPayload nettyPayload);
+
+    /**
+     * Get the id of connection in the writer.
+     *
+     * @return the id of connection.
+     */
+    NettyConnectionId getNettyConnectionId();
+
+    /**
+     * Get the number of written but unsent buffers.
+     *
+     * @return the buffer number.
+     */
+    int numQueuedBuffers();
+
+    /** Close the connection and release all resources. */
+    void close();

Review Comment:
   Maybe this interface can extends `AutoClosable`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();

Review Comment:
   AssertJ has it's specific assertions for `Optional` and `CompletableFuture`, 
please use that. You should also check all same issues for all tests.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, 
InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = 
prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, 
InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                
availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {
+                inputChannel.read(
+                        new NetworkBuffer(
+                                
MemorySegmentFactory.allocateUnpooledSegment(0),
+                                BufferRecycler.DummyBufferRecycler.INSTANCE,

Review Comment:
   Why not use `FreeingBufferRecycler.INSTANCE`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);

Review Comment:
   ```suggestion
           assertThat(nettyPayload.getError()).hasValue(error);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterTest {
+
+    private static final int SUBPARTITION_ID = 0;
+
+    @Test
+    void testWriteBuffer() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        assertThat(nettyPayloadQueue.size()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testGetNettyConnectionId() {
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
+    }
+
+    @Test
+    void testNumQueuedBuffers() {

Review Comment:
   I wonder can we merge this method to `testWriteBuffer `?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceProducer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyServiceProducer} is used as the callback to register {@link 
NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyServiceProducer {
+
+    /**
+     * Establish a netty connection for a subpartition.
+     *
+     * @param subpartitionId subpartition id indicates the id of subpartition.
+     * @param nettyConnectionWriter writer is used to write buffers to netty 
connection.
+     */
+    void connectionEstablished(
+            TieredStorageSubpartitionId subpartitionId,
+            NettyConnectionWriter nettyConnectionWriter);
+
+    /**
+     * Break the netty connection related to the {@link NettyConnectionId}.

Review Comment:
   Ditto.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionView.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+
+/**
+ * The {@link TieredStoreResultSubpartitionView} is the implementation of 
{@link
+ * ResultSubpartitionView} of {@link TieredResultPartition}.
+ */
+public class TieredStoreResultSubpartitionView implements 
ResultSubpartitionView {
+
+    private final BufferAvailabilityListener availabilityListener;
+
+    private final List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private final List<NettyServiceProducer> serviceProducers;
+
+    private final List<NettyConnectionId> nettyConnectionIds;
+
+    private boolean isReleased = false;

Review Comment:
   Is this filed thread safe?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, 
InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = 
prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, 
InterruptedException {

Review Comment:
   There are too many duplicate codes in these tests. I'd prefer extract and 
reuse their common parts.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {

Review Comment:
   We should disable(private) the non-parameters `ctr` for this class.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {

Review Comment:
   ```suggestion
       void testGetBuffer() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, 
InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = 
prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, 
sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, 
InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new 
CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> 
availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                
availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> 
prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {

Review Comment:
   I wonder why do we need to make a distinction here? It seems that this is 
only for deciding the `nextDataType`, 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements 
TieredStorageNettyService {
+
+    // ------------------------------------
+    //          For producer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>>
+            registeredServiceProducers = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    // ------------------------------------
+    //          For consumer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, 
Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, Supplier<InputChannel>>>
+            registeredInputChannelProviders = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<
+                            TieredStorageSubpartitionId,
+                            
NettyConnectionReaderAvailabilityAndPriorityHelper>>
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers =
+                    new ConcurrentHashMap<>();
+
+    @Override
+    public void registerProducer(
+            TieredStoragePartitionId partitionId, NettyServiceProducer 
serviceProducer) {
+        List<NettyServiceProducer> serviceProducers =
+                registeredServiceProducers.getOrDefault(partitionId, new 
ArrayList<>());
+        serviceProducers.add(serviceProducer);
+        registeredServiceProducers.put(partitionId, serviceProducers);

Review Comment:
   I'd prefer using pattern like 
`registeredServiceProducers.computeIfAbsent().add()`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);

Review Comment:
   ```suggestion
           Buffer buffer = BufferBuilderTestUtils.buildSomeBuffer(0);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, 
bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);

Review Comment:
   ```suggestion
       assertThat(nettyPayload.getBuffer()).hasValue(buffer);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {

Review Comment:
   We should also test some incorrect cases.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements 
TieredStorageNettyService {

Review Comment:
   I am a bit uneasy about this class. We rely too much on `ConCurrentHashMap` 
instead of locks, which means we need to carefully check the code to ensure 
some operations are atomic. Additionally, since some methods do not have a 
caller in product codes, it is not known on which threads they will be called, 
so it is best to add some comments about them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to