reswqa commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1222589878
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -311,6 +311,16 @@ public long unsynchronizedGetSizeOfQueuedBuffers() {
return 0;
}
+ /**
+ * Notify the upstream the id of required segment that should be sent to
netty connection.
Review Comment:
This seems like a sick sentence 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderAvailabilityAndPriorityHelper.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+/**
+ * {@link NettyConnectionReaderAvailabilityAndPriorityHelper} is used to help
the reader notify the
+ * available and priority status of {@link NettyConnectionReader}, and update
the priority sequence
Review Comment:
```suggestion
* availability and priority status of {@link NettyConnectionReader}, and
update the priority sequence
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.util.Optional;
+
+/** {@link NettyConnectionReader} is used by {@link TierConsumerAgent} to read
buffer from netty. */
+public interface NettyConnectionReader {
+ /**
+ * Read a buffer from netty connection.
+ *
+ * @param segmentId segment id indicates the id of segment.
+ * @return Optional.empty() will be returned if there is no buffer sent
from netty connection
Review Comment:
```suggestion
* @return {@link Optional#empty()} will be returned if there is no
buffer sent from netty connection
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in
tiered storage shuffle
+ * mode.
+ */
+@Internal
+public class EndOfSegmentEvent extends RuntimeEvent {
+
+ /** The singleton instance of this event. */
+ public static final EndOfSegmentEvent INSTANCE = new EndOfSegmentEvent();
Review Comment:
We should disable(private) the default `ctr` for this class if it should be
a singlton.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write
buffers to netty
+ * connection. Buffers in the writer will be written to a queue structure and
netty server will send
+ * buffers from it.
+ */
+public interface NettyConnectionWriter {
+ /**
+ * Write a buffer to netty connection.
+ *
+ * @param nettyPayload netty payload represents the buffer.
Review Comment:
If it represents the buffer, why not directly use `Buffer` here? I'd prefer
change this description to `the payload send to netty connection`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterTest {
+
+ private static final int SUBPARTITION_ID = 0;
+
+ @Test
+ void testWriteBuffer() {
+ int bufferNumber = 10;
+ ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+ NettyConnectionWriter nettyConnectionWriter =
+ new NettyConnectionWriterImpl(nettyPayloadQueue);
+ writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+ assertThat(nettyPayloadQueue.size()).isEqualTo(bufferNumber);
+ }
+
+ @Test
+ void testGetNettyConnectionId() {
+ ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+ NettyConnectionWriter nettyConnectionWriter =
+ new NettyConnectionWriterImpl(nettyPayloadQueue);
+ assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
+ }
+
+ @Test
+ void testNumQueuedBuffers() {
+ int bufferNumber = 10;
+ ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+ NettyConnectionWriter nettyConnectionWriter =
+ new NettyConnectionWriterImpl(nettyPayloadQueue);
+ writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+
assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(bufferNumber);
+ }
+
+ @Test
+ void testClose() {
+ int bufferNumber = 10;
+ ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+ NettyConnectionWriter nettyConnectionWriter =
+ new NettyConnectionWriterImpl(nettyPayloadQueue);
+ writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+ nettyConnectionWriter.close();
+ assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(0);
+ }
+
+ private void writeBufferTpWriter(
Review Comment:
```suggestion
private void writeBufferToWriter(
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+ private final BiConsumer<TieredStorageSubpartitionId,
NettyConnectionWriter>
+ connectionEstablishConsumer;
Review Comment:
```suggestion
connectionEstablishedConsumer;
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceProducer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyServiceProducer} is used as the callback to register {@link
NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyServiceProducer {
+
+ /**
+ * Establish a netty connection for a subpartition.
Review Comment:
This java doc a bit confusing, It looks like this method will proactively
establish a connection, but in reality it is just a callback.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+ private final BiConsumer<TieredStorageSubpartitionId,
NettyConnectionWriter>
+ connectionEstablishConsumer;
+
+ private final Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+ public TestingNettyServiceProducer(
+ BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+ connectionEstablishConsumer,
+ Consumer<NettyConnectionId> connectionBrokenConsumer) {
+ this.connectionEstablishConsumer = connectionEstablishConsumer;
+ this.connectionBrokenConsumer = connectionBrokenConsumer;
+ }
+
+ @Override
+ public void connectionEstablished(
+ TieredStorageSubpartitionId subpartitionId,
+ NettyConnectionWriter nettyConnectionWriter) {
+ connectionEstablishConsumer.accept(subpartitionId,
nettyConnectionWriter);
+ }
+
+ @Override
+ public void connectionBroken(NettyConnectionId connectionId) {
+ connectionBrokenConsumer.accept(connectionId);
+ }
+
+ /** Builder for {@link TestingNettyServiceProducer}. */
+ public static class Builder {
+
+ private BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+ connectionEstablishConsumer;
+
+ private Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+ public Builder() {}
Review Comment:
Why we need this `ctr` instead of the default one compiler generated.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
Review Comment:
```suggestion
assertThat(nettyPayload.getBuffer()).isPresent();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+ assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+ }
+
+ @Test
+ void getError() {
Review Comment:
```suggestion
void testGetError() {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+ assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+ }
+
+ @Test
+ void getError() {
+ Throwable error = new RuntimeException("test exception");
+ NettyPayload nettyPayload = NettyPayload.newError(error);
+ assertThat(nettyPayload.getError()).isEqualTo(error);
+ }
+
+ @Test
+ void getBufferIndex() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBufferIndex()).isEqualTo(bufferIndex);
+ }
+
+ @Test
+ void getSubpartitionId() {
Review Comment:
```suggestion
void testGetSubpartitionId() {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+ assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+ }
+
+ @Test
+ void getError() {
+ Throwable error = new RuntimeException("test exception");
+ NettyPayload nettyPayload = NettyPayload.newError(error);
+ assertThat(nettyPayload.getError()).isEqualTo(error);
+ }
+
+ @Test
+ void getBufferIndex() {
Review Comment:
```suggestion
void testGetBufferIndex() {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+ assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+ }
+
+ @Test
+ void getError() {
+ Throwable error = new RuntimeException("test exception");
+ NettyPayload nettyPayload = NettyPayload.newError(error);
+ assertThat(nettyPayload.getError()).isEqualTo(error);
+ }
+
+ @Test
+ void getBufferIndex() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBufferIndex()).isEqualTo(bufferIndex);
+ }
+
+ @Test
+ void getSubpartitionId() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getSubpartitionId()).isEqualTo(subpartitionId);
+ }
+
+ @Test
+ void getSegmentId() {
Review Comment:
```suggestion
void testGetSegmentId() {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+ /**
+ * The data buffer. If the buffer is not null, bufferIndex and
subpartitionId will be
+ * non-negative, error will be null, segmentId will be -1;
+ */
+ @Nullable private final Buffer buffer;
+
+ /**
+ * The error information. If the error is not null, buffer will be null,
segmentId and
+ * bufferIndex and subpartitionId will be -1.
+ */
+ @Nullable private final Throwable error;
+
+ /**
+ * The index of buffer. If the buffer index is non-negative, buffer won't
be null, error will be
+ * null, subpartitionId will be non-negative, segmentId will be -1.
+ */
+ private final int bufferIndex;
+
+ /**
+ * The id of subpartition. If the subpartition id is non-negative, buffer
won't be null, error
+ * will be null, bufferIndex will be non-negative, segmentId will be -1.
+ */
+ private final int subpartitionId;
+
+ /**
+ * The id of segment. If the segment id is non-negative, buffer and error
be null, bufferIndex
Review Comment:
```suggestion
* The id of segment. If the segment id is non-negative, buffer and
error will be null, bufferIndex
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to
netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition
id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+ /**
+ * The data buffer. If the buffer is not null, bufferIndex and
subpartitionId will be
+ * non-negative, error will be null, segmentId will be -1;
+ */
+ @Nullable private final Buffer buffer;
+
+ /**
+ * The error information. If the error is not null, buffer will be null,
segmentId and
+ * bufferIndex and subpartitionId will be -1.
+ */
+ @Nullable private final Throwable error;
+
+ /**
+ * The index of buffer. If the buffer index is non-negative, buffer won't
be null, error will be
+ * null, subpartitionId will be non-negative, segmentId will be -1.
+ */
+ private final int bufferIndex;
+
+ /**
+ * The id of subpartition. If the subpartition id is non-negative, buffer
won't be null, error
+ * will be null, bufferIndex will be non-negative, segmentId will be -1.
+ */
+ private final int subpartitionId;
+
+ /**
+ * The id of segment. If the segment id is non-negative, buffer and error
be null, bufferIndex
+ * and subpartitionId will be -1.
+ */
+ private final int segmentId;
+
+ private NettyPayload(
+ @Nullable Buffer buffer,
+ int bufferIndex,
+ int subpartitionId,
+ @Nullable Throwable error,
+ int segmentId) {
+ this.buffer = buffer;
+ this.bufferIndex = bufferIndex;
+ this.subpartitionId = subpartitionId;
+ this.error = error;
+ this.segmentId = segmentId;
+ }
+
+ public static NettyPayload newBuffer(Buffer buffer, int bufferIndex, int
subpartitionId) {
+ checkState(buffer != null && bufferIndex != -1 && subpartitionId !=
-1);
+ return new NettyPayload(buffer, bufferIndex, subpartitionId, null, -1);
+ }
+
+ public static NettyPayload newError(Throwable error) {
+ checkState(error != null);
+ return new NettyPayload(null, -1, -1, error, -1);
Review Comment:
```suggestion
return new NettyPayload(null, -1, -1, checkNotNull(error), -1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+ private static final int INPUT_CHANNEL_INDEX = 0;
+
+ @Test
+ void testReadBufferOfNonPriorityDataType() {
+ int bufferNumber = 1;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isTrue();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadBufferOfPriorityDataType() throws ExecutionException,
InterruptedException {
+ int bufferNumber = 2;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, true,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+ assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result1.f1).isEqualTo(true);
+ Tuple2<Integer, Integer> result2 =
prioritySequenceNumberConsumer.get();
+ assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result2.f1).isEqualTo(0);
+ }
+
+ @Test
+ void testReadEmptyBuffer() {
+ int bufferNumber = 0;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+ (inputChannelIndex, priority) ->
+ availableAndPriorityConsumer.complete(
+ Tuple2.of(inputChannelIndex,
priority)),
+ (inputChannelIndex, sequenceNumber) ->
+ prioritySequenceNumberConsumer.complete(
+ Tuple2.of(inputChannelIndex,
sequenceNumber)));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadDifferentSegments() throws ExecutionException,
InterruptedException {
+ int bufferNumber = 0;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ reader.readBuffer(0);
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ reader.readBuffer(1);
+ assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+ }
+
+ private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer) {
+ return (inputChannelIndex, priority) ->
+
availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+ }
+
+ private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer) {
+ return (inputChannelIndex, sequenceNumber) ->
+ prioritySequenceNumberConsumer.complete(
+ Tuple2.of(inputChannelIndex, sequenceNumber));
+ }
+
+ private Supplier<InputChannel> createInputChannelSupplier(
+ int bufferNumber,
+ boolean priority,
+ CompletableFuture<Integer> requiredSegmentIdFuture) {
+ TestInputChannel inputChannel =
+ new TestInputChannel(
+ new SingleInputGateBuilder().build(),
+ INPUT_CHANNEL_INDEX,
+ requiredSegmentIdFuture);
+ try {
+ if (bufferNumber == 1) {
+ inputChannel.read(
+ new NetworkBuffer(
+
MemorySegmentFactory.allocateUnpooledSegment(0),
+ BufferRecycler.DummyBufferRecycler.INSTANCE,
+ priority ? PRIORITIZED_EVENT_BUFFER :
DATA_BUFFER),
+ NONE);
+ } else {
+ for (int index = 0; index < bufferNumber; ++index) {
+ inputChannel.read(
+ new NetworkBuffer(
+
MemorySegmentFactory.allocateUnpooledSegment(0),
+
BufferRecycler.DummyBufferRecycler.INSTANCE,
+ priority ? PRIORITIZED_EVENT_BUFFER :
DATA_BUFFER));
+ }
+ }
+ } catch (IOException | InterruptedException e) {
+ ExceptionUtils.rethrow(e, "Failed to create test input channel.");
+ }
+ return () -> inputChannel;
+ }
+
+ private NettyConnectionReader createNettyConnectionReader(
Review Comment:
Can be static.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.Queue;
+
+/** The default implementation of {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterImpl implements NettyConnectionWriter {
+
+ private final Queue<NettyPayload> bufferQueue;
+
+ private final NettyConnectionId connectionId;
+
+ public NettyConnectionWriterImpl(Queue<NettyPayload> bufferQueue) {
+ this.bufferQueue = bufferQueue;
+ this.connectionId = NettyConnectionId.newId();
+ }
+
+ @Override
+ public NettyConnectionId getNettyConnectionId() {
+ return connectionId;
+ }
+
+ @Override
+ public int numQueuedBuffers() {
+ return bufferQueue.size();
+ }
+
+ @Override
+ public void writeBuffer(NettyPayload nettyPayload) {
+ bufferQueue.add(nettyPayload);
+ }
+
+ @Override
+ public void close() {
+ NettyPayload nettyPayload;
+ while ((nettyPayload = bufferQueue.poll()) != null) {
Review Comment:
I wonder can we ensure that `bufferQueue ` do not have `Throwable` type
payload during close it. If not, what will happen to this case?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write
buffers to netty
+ * connection. Buffers in the writer will be written to a queue structure and
netty server will send
+ * buffers from it.
+ */
+public interface NettyConnectionWriter {
+ /**
+ * Write a buffer to netty connection.
+ *
+ * @param nettyPayload netty payload represents the buffer.
+ */
+ void writeBuffer(NettyPayload nettyPayload);
+
+ /**
+ * Get the id of connection in the writer.
+ *
+ * @return the id of connection.
+ */
+ NettyConnectionId getNettyConnectionId();
+
+ /**
+ * Get the number of written but unsent buffers.
+ *
+ * @return the buffer number.
+ */
+ int numQueuedBuffers();
+
+ /** Close the connection and release all resources. */
+ void close();
Review Comment:
Maybe this interface can extends `AutoClosable`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+ private static final int INPUT_CHANNEL_INDEX = 0;
+
+ @Test
+ void testReadBufferOfNonPriorityDataType() {
+ int bufferNumber = 1;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isTrue();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
Review Comment:
AssertJ has it's specific assertions for `Optional` and `CompletableFuture`,
please use that. You should also check all same issues for all tests.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+ private static final int INPUT_CHANNEL_INDEX = 0;
+
+ @Test
+ void testReadBufferOfNonPriorityDataType() {
+ int bufferNumber = 1;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isTrue();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadBufferOfPriorityDataType() throws ExecutionException,
InterruptedException {
+ int bufferNumber = 2;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, true,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+ assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result1.f1).isEqualTo(true);
+ Tuple2<Integer, Integer> result2 =
prioritySequenceNumberConsumer.get();
+ assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result2.f1).isEqualTo(0);
+ }
+
+ @Test
+ void testReadEmptyBuffer() {
+ int bufferNumber = 0;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+ (inputChannelIndex, priority) ->
+ availableAndPriorityConsumer.complete(
+ Tuple2.of(inputChannelIndex,
priority)),
+ (inputChannelIndex, sequenceNumber) ->
+ prioritySequenceNumberConsumer.complete(
+ Tuple2.of(inputChannelIndex,
sequenceNumber)));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadDifferentSegments() throws ExecutionException,
InterruptedException {
+ int bufferNumber = 0;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ reader.readBuffer(0);
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ reader.readBuffer(1);
+ assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+ }
+
+ private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer) {
+ return (inputChannelIndex, priority) ->
+
availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+ }
+
+ private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer) {
+ return (inputChannelIndex, sequenceNumber) ->
+ prioritySequenceNumberConsumer.complete(
+ Tuple2.of(inputChannelIndex, sequenceNumber));
+ }
+
+ private Supplier<InputChannel> createInputChannelSupplier(
+ int bufferNumber,
+ boolean priority,
+ CompletableFuture<Integer> requiredSegmentIdFuture) {
+ TestInputChannel inputChannel =
+ new TestInputChannel(
+ new SingleInputGateBuilder().build(),
+ INPUT_CHANNEL_INDEX,
+ requiredSegmentIdFuture);
+ try {
+ if (bufferNumber == 1) {
+ inputChannel.read(
+ new NetworkBuffer(
+
MemorySegmentFactory.allocateUnpooledSegment(0),
+ BufferRecycler.DummyBufferRecycler.INSTANCE,
Review Comment:
Why not use `FreeingBufferRecycler.INSTANCE`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+ assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+ }
+
+ @Test
+ void getError() {
+ Throwable error = new RuntimeException("test exception");
+ NettyPayload nettyPayload = NettyPayload.newError(error);
+ assertThat(nettyPayload.getError()).isEqualTo(error);
Review Comment:
```suggestion
assertThat(nettyPayload.getError()).hasValue(error);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterTest {
+
+ private static final int SUBPARTITION_ID = 0;
+
+ @Test
+ void testWriteBuffer() {
+ int bufferNumber = 10;
+ ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+ NettyConnectionWriter nettyConnectionWriter =
+ new NettyConnectionWriterImpl(nettyPayloadQueue);
+ writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+ assertThat(nettyPayloadQueue.size()).isEqualTo(bufferNumber);
+ }
+
+ @Test
+ void testGetNettyConnectionId() {
+ ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+ NettyConnectionWriter nettyConnectionWriter =
+ new NettyConnectionWriterImpl(nettyPayloadQueue);
+ assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
+ }
+
+ @Test
+ void testNumQueuedBuffers() {
Review Comment:
I wonder can we merge this method to `testWriteBuffer `?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceProducer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyServiceProducer} is used as the callback to register {@link
NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyServiceProducer {
+
+ /**
+ * Establish a netty connection for a subpartition.
+ *
+ * @param subpartitionId subpartition id indicates the id of subpartition.
+ * @param nettyConnectionWriter writer is used to write buffers to netty
connection.
+ */
+ void connectionEstablished(
+ TieredStorageSubpartitionId subpartitionId,
+ NettyConnectionWriter nettyConnectionWriter);
+
+ /**
+ * Break the netty connection related to the {@link NettyConnectionId}.
Review Comment:
Ditto.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionView.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+
+/**
+ * The {@link TieredStoreResultSubpartitionView} is the implementation of
{@link
+ * ResultSubpartitionView} of {@link TieredResultPartition}.
+ */
+public class TieredStoreResultSubpartitionView implements
ResultSubpartitionView {
+
+ private final BufferAvailabilityListener availabilityListener;
+
+ private final List<Queue<NettyPayload>> nettyPayloadQueues;
+
+ private final List<NettyServiceProducer> serviceProducers;
+
+ private final List<NettyConnectionId> nettyConnectionIds;
+
+ private boolean isReleased = false;
Review Comment:
Is this filed thread safe?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+ private static final int INPUT_CHANNEL_INDEX = 0;
+
+ @Test
+ void testReadBufferOfNonPriorityDataType() {
+ int bufferNumber = 1;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isTrue();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadBufferOfPriorityDataType() throws ExecutionException,
InterruptedException {
+ int bufferNumber = 2;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, true,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+ assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result1.f1).isEqualTo(true);
+ Tuple2<Integer, Integer> result2 =
prioritySequenceNumberConsumer.get();
+ assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result2.f1).isEqualTo(0);
+ }
+
+ @Test
+ void testReadEmptyBuffer() {
+ int bufferNumber = 0;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+ (inputChannelIndex, priority) ->
+ availableAndPriorityConsumer.complete(
+ Tuple2.of(inputChannelIndex,
priority)),
+ (inputChannelIndex, sequenceNumber) ->
+ prioritySequenceNumberConsumer.complete(
+ Tuple2.of(inputChannelIndex,
sequenceNumber)));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadDifferentSegments() throws ExecutionException,
InterruptedException {
Review Comment:
There are too many duplicate codes in these tests. I'd prefer extract and
reuse their common parts.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
Review Comment:
We should disable(private) the non-parameters `ctr` for this class.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
Review Comment:
```suggestion
void testGetBuffer() {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+ private static final int INPUT_CHANNEL_INDEX = 0;
+
+ @Test
+ void testReadBufferOfNonPriorityDataType() {
+ int bufferNumber = 1;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isTrue();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadBufferOfPriorityDataType() throws ExecutionException,
InterruptedException {
+ int bufferNumber = 2;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, true,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isTrue();
+ assertThat(buffer.get().isBuffer()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+ assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result1.f1).isEqualTo(true);
+ Tuple2<Integer, Integer> result2 =
prioritySequenceNumberConsumer.get();
+ assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+ assertThat(result2.f1).isEqualTo(0);
+ }
+
+ @Test
+ void testReadEmptyBuffer() {
+ int bufferNumber = 0;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+ (inputChannelIndex, priority) ->
+ availableAndPriorityConsumer.complete(
+ Tuple2.of(inputChannelIndex,
priority)),
+ (inputChannelIndex, sequenceNumber) ->
+ prioritySequenceNumberConsumer.complete(
+ Tuple2.of(inputChannelIndex,
sequenceNumber)));
+ Optional<Buffer> buffer = reader.readBuffer(0);
+ assertThat(buffer.isPresent()).isFalse();
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+ assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+ }
+
+ @Test
+ void testReadDifferentSegments() throws ExecutionException,
InterruptedException {
+ int bufferNumber = 0;
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer =
+ new CompletableFuture<>();
+ CompletableFuture<Integer> requiredSegmentIdFuture = new
CompletableFuture<>();
+ Supplier<InputChannel> inputChannelSupplier =
+ createInputChannelSupplier(bufferNumber, false,
requiredSegmentIdFuture);
+ NettyConnectionReader reader =
+ createNettyConnectionReader(
+ inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+ reader.readBuffer(0);
+ assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+ reader.readBuffer(1);
+ assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+ }
+
+ private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+ CompletableFuture<Tuple2<Integer, Boolean>>
availableAndPriorityConsumer) {
+ return (inputChannelIndex, priority) ->
+
availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+ }
+
+ private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+ CompletableFuture<Tuple2<Integer, Integer>>
prioritySequenceNumberConsumer) {
+ return (inputChannelIndex, sequenceNumber) ->
+ prioritySequenceNumberConsumer.complete(
+ Tuple2.of(inputChannelIndex, sequenceNumber));
+ }
+
+ private Supplier<InputChannel> createInputChannelSupplier(
+ int bufferNumber,
+ boolean priority,
+ CompletableFuture<Integer> requiredSegmentIdFuture) {
+ TestInputChannel inputChannel =
+ new TestInputChannel(
+ new SingleInputGateBuilder().build(),
+ INPUT_CHANNEL_INDEX,
+ requiredSegmentIdFuture);
+ try {
+ if (bufferNumber == 1) {
Review Comment:
I wonder why do we need to make a distinction here? It seems that this is
only for deciding the `nextDataType`,
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements
TieredStorageNettyService {
+
+ // ------------------------------------
+ // For producer side
+ // ------------------------------------
+
+ private final Map<TieredStoragePartitionId, List<NettyServiceProducer>>
+ registeredServiceProducers = new ConcurrentHashMap<>();
+
+ private final Map<NettyConnectionId, BufferAvailabilityListener>
+ registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+ // ------------------------------------
+ // For consumer side
+ // ------------------------------------
+
+ private final Map<TieredStoragePartitionId,
Map<TieredStorageSubpartitionId, Integer>>
+ registeredChannelIndexes = new ConcurrentHashMap<>();
+
+ private final Map<
+ TieredStoragePartitionId,
+ Map<TieredStorageSubpartitionId, Supplier<InputChannel>>>
+ registeredInputChannelProviders = new ConcurrentHashMap<>();
+
+ private final Map<
+ TieredStoragePartitionId,
+ Map<
+ TieredStorageSubpartitionId,
+
NettyConnectionReaderAvailabilityAndPriorityHelper>>
+ registeredNettyConnectionReaderAvailabilityAndPriorityHelpers =
+ new ConcurrentHashMap<>();
+
+ @Override
+ public void registerProducer(
+ TieredStoragePartitionId partitionId, NettyServiceProducer
serviceProducer) {
+ List<NettyServiceProducer> serviceProducers =
+ registeredServiceProducers.getOrDefault(partitionId, new
ArrayList<>());
+ serviceProducers.add(serviceProducer);
+ registeredServiceProducers.put(partitionId, serviceProducers);
Review Comment:
I'd prefer using pattern like
`registeredServiceProducers.computeIfAbsent().add()`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
Review Comment:
```suggestion
Buffer buffer = BufferBuilderTestUtils.buildSomeBuffer(0);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+ @Test
+ void getBuffer() {
+ Buffer buffer =
+ new NetworkBuffer(
+ MemorySegmentFactory.allocateUnpooledSegment(0),
+ FreeingBufferRecycler.INSTANCE,
+ Buffer.DataType.DATA_BUFFER,
+ 0);
+ int bufferIndex = 0;
+ int subpartitionId = 1;
+ NettyPayload nettyPayload = NettyPayload.newBuffer(buffer,
bufferIndex, subpartitionId);
+ assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+ assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
Review Comment:
```suggestion
assertThat(nettyPayload.getBuffer()).hasValue(buffer);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
Review Comment:
We should also test some incorrect cases.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements
TieredStorageNettyService {
Review Comment:
I am a bit uneasy about this class. We rely too much on `ConCurrentHashMap`
instead of locks, which means we need to carefully check the code to ensure
some operations are atomic. Additionally, since some methods do not have a
caller in product codes, it is not known on which threads they will be called,
so it is best to add some comments about them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]