xintongsong commented on code in PR #22342: URL: https://github.com/apache/flink/pull/22342#discussion_r1220671319
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** + * {@link NettyProducerService} is used as the callback to register {@link NettyConnectionWriter} + * and disconnect netty connection in {@link TierProducerAgent}. + */ +public interface NettyProducerService { Review Comment: ```suggestion public interface NettyServiceProducer { ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** + * {@link NettyProducerService} is used as the callback to register {@link NettyConnectionWriter} + * and disconnect netty connection in {@link TierProducerAgent}. + */ +public interface NettyProducerService { + + /** + * Register a {@link NettyConnectionWriter} for a subpartition. + * + * @param subpartitionId subpartition id indicates the id of subpartition. + * @param nettyConnectionWriter writer is used to write buffers to netty connection. + */ + void registerNettyConnectionWriter( Review Comment: ```suggestion void connectionEstablished( ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** + * {@link NettyProducerService} is used as the callback to register {@link NettyConnectionWriter} + * and disconnect netty connection in {@link TierProducerAgent}. + */ +public interface NettyProducerService { + + /** + * Register a {@link NettyConnectionWriter} for a subpartition. + * + * @param subpartitionId subpartition id indicates the id of subpartition. + * @param nettyConnectionWriter writer is used to write buffers to netty connection. + */ + void registerNettyConnectionWriter( + TieredStorageSubpartitionId subpartitionId, + NettyConnectionWriter nettyConnectionWriter); + + /** + * Disconnect the netty connection related to the {@link NettyConnectionId}. + * + * @param connectionId connection id is the id of connection. + */ + void disconnectNettyConnection(NettyConnectionId connectionId); Review Comment: ```suggestion void connectionBroken(NettyConnectionId connectionId); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It + * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also + * indicate an error or a segment id. + */ +public class NettyPayload { + + // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be + // null, segmentId will be -1; + private Buffer buffer; + + // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId + // will be -1. + private Throwable error; Review Comment: What happens to the `@Nullable`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It + * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also + * indicate an error or a segment id. + */ +public class NettyPayload { + + // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be + // null, segmentId will be -1; + private Buffer buffer; + + // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId + // will be -1. + private Throwable error; + + // If the bufferIndex is non-negative, buffer won't be null, error will be null, subpartitionId + // will be non-negative, segmentId will be -1. + private int bufferIndex = -1; + + // If the subpartitionId is non-negative, buffer won't be null, error will be null, bufferIndex + // will be non-negative, segmentId will be -1. + private int subpartitionId = -1; + + // If the segmentId is non-negative, buffer and error be null, bufferIndex and subpartitionId + // will be -1. + private int segmentId = -1; + + public NettyPayload(Buffer buffer, int bufferIndex, int subpartitionId) { + this.buffer = buffer; + this.bufferIndex = bufferIndex; + this.subpartitionId = subpartitionId; + } + + public NettyPayload(Throwable error) { + this.error = error; + } + + public NettyPayload(int segmentId) { + this.segmentId = segmentId; + } Review Comment: Should check the arguments. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It + * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also + * indicate an error or a segment id. + */ +public class NettyPayload { + + // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be + // null, segmentId will be -1; + private Buffer buffer; + + // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId + // will be -1. + private Throwable error; + + // If the bufferIndex is non-negative, buffer won't be null, error will be null, subpartitionId + // will be non-negative, segmentId will be -1. + private int bufferIndex = -1; + + // If the subpartitionId is non-negative, buffer won't be null, error will be null, bufferIndex + // will be non-negative, segmentId will be -1. + private int subpartitionId = -1; + + // If the segmentId is non-negative, buffer and error be null, bufferIndex and subpartitionId + // will be -1. + private int segmentId = -1; + + public NettyPayload(Buffer buffer, int bufferIndex, int subpartitionId) { + this.buffer = buffer; + this.bufferIndex = bufferIndex; + this.subpartitionId = subpartitionId; + } + + public NettyPayload(Throwable error) { + this.error = error; + } + + public NettyPayload(int segmentId) { + this.segmentId = segmentId; + } Review Comment: I'd suggest to use factories rather than constructors, so that they can have meaningful names. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** + * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffers to netty + * connection. + */ +public interface NettyConnectionWriter { + /** + * Write a buffer to netty connection. + * + * @param nettyPayload buffer context represents the buffer. + */ + void writeBuffer(NettyPayload nettyPayload); + + /** + * Get the id of connection in the writer. + * + * @return the id of connection. + */ + NettyConnectionId getNettyConnectionId(); + + /** + * Get the number of existed buffers in the writer. The {@link NettyConnectionWriter} may be + * implemented based on a queue structure, this method is used to get the residual buffers in + * the writer. + * + * @return the buffer number. + */ + int numQueuedBuffers(); Review Comment: ``` The {@link NettyConnectionWriter} may be implemented based on a queue structure ``` 1. This belongs to the class javadoc. 2. What is this queue? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderImpl.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.BiConsumer; + +/** The default implementation of {@link NettyConnectionReader}. */ +public class NettyConnectionReaderImpl implements NettyConnectionReader { + + /** subpartitionId is used to indicate the id of subpartition. */ + private final int subpartitionId; + + /** + * inputChannels is initialized in {@link SingleInputGate} and used to read buffer from netty + * connection. + */ + private final InputChannel[] inputChannels; + + /** + * queueChannelCallback is used to queue a channel to {@link SingleInputGate} to trigger next + * round of reading from this channel. The Integer in the return type is the index of input + * channel and the Boolean in the return type is whether to queue the channel with priority. + */ + private final BiConsumer<Integer, Boolean> queueChannelCallback; + + /** + * The array is used to record the latest sequence number of buffer with priority data type, + * which can decide outdated status of sequence number and whether to enqueue the related input + * channel to {@link SingleInputGate}. + */ + private final int[] lastPrioritySequenceNumber; Review Comment: 1. `queueChannelCallback` -> `availabilityAndPriorityNotifier`. 2. Should expose `lastPrioritySequenceNumber` via interfaces, rather than internal fields shared by two classes. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionId.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import java.util.Objects; + +/** {@link NettyConnectionId} indicates the unique id of netty connection. */ +public class NettyConnectionId { + + // The default id of netty connection. + private static int defaultId = 0; Review Comment: 1. Should not be named with "default". 2. The static field can be concurrently accessed, and would have consistency issues. I'd suggest to use a large random value, or partitionId + subpartitionId + a small random value. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderImpl.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.BiConsumer; + +/** The default implementation of {@link NettyConnectionReader}. */ +public class NettyConnectionReaderImpl implements NettyConnectionReader { + + /** subpartitionId is used to indicate the id of subpartition. */ + private final int subpartitionId; + + /** + * inputChannels is initialized in {@link SingleInputGate} and used to read buffer from netty + * connection. + */ + private final InputChannel[] inputChannels; + + /** + * queueChannelCallback is used to queue a channel to {@link SingleInputGate} to trigger next + * round of reading from this channel. The Integer in the return type is the index of input + * channel and the Boolean in the return type is whether to queue the channel with priority. + */ + private final BiConsumer<Integer, Boolean> queueChannelCallback; + + /** + * The array is used to record the latest sequence number of buffer with priority data type, + * which can decide outdated status of sequence number and whether to enqueue the related input + * channel to {@link SingleInputGate}. + */ + private final int[] lastPrioritySequenceNumber; + + /** The last required segment id. */ + private int lastRequiredSegmentId = 0; + + public NettyConnectionReaderImpl( + int subpartitionId, + InputChannel[] inputChannels, + BiConsumer<Integer, Boolean> queueChannelCallback, + int[] lastPrioritySequenceNumber) { + this.subpartitionId = subpartitionId; + this.inputChannels = inputChannels; + this.queueChannelCallback = queueChannelCallback; + this.lastPrioritySequenceNumber = lastPrioritySequenceNumber; + } + + @Override + public Optional<Buffer> readBuffer(int segmentId) { + if (segmentId > 0L && (segmentId != lastRequiredSegmentId)) { + lastRequiredSegmentId = segmentId; + inputChannels[subpartitionId].notifyRequiredSegmentId(segmentId); + } + Optional<InputChannel.BufferAndAvailability> bufferAndAvailability = Optional.empty(); + try { + bufferAndAvailability = inputChannels[subpartitionId].getNextBuffer(); Review Comment: This doesn't seems right. - Number of channels corresponds to the upstream parallelism, while subpartitionId corresponds to the downstream parallelism. We cannot use subpartitionId to index an input channel. - If we only need to access `inputChannels[subpartitionId]` in this class, why would we need the entire array? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.BiConsumer; + +import static org.apache.flink.util.Preconditions.checkState; + +/** The default implementation of {@link TieredStorageNettyService}. */ +public class TieredStorageNettyServiceImpl implements TieredStorageNettyService { + + private final Map<TieredStoragePartitionId, List<NettyProducerService>> + registeredProducerServices = new ConcurrentHashMap<>(); + + private final Map<NettyConnectionId, BufferAvailabilityListener> + registeredAvailabilityListeners = new ConcurrentHashMap<>(); + + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>> + registeredChannelIndexes = new ConcurrentHashMap<>(); + + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, InputChannel[]>> + registeredInputChannels = new ConcurrentHashMap<>(); + + private final Map< + TieredStoragePartitionId, + Map<TieredStorageSubpartitionId, BiConsumer<Integer, Boolean>>> + registeredQueueChannelCallbacks = new ConcurrentHashMap<>(); + + private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, int[]>> + registeredPriorityArrays = new ConcurrentHashMap<>(); Review Comment: Might be better to group them by producer and consumer. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It + * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also + * indicate an error or a segment id. + */ +public class NettyPayload { + + // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be + // null, segmentId will be -1; + private Buffer buffer; + + // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId + // will be -1. + private Throwable error; + + // If the bufferIndex is non-negative, buffer won't be null, error will be null, subpartitionId + // will be non-negative, segmentId will be -1. + private int bufferIndex = -1; + + // If the subpartitionId is non-negative, buffer won't be null, error will be null, bufferIndex + // will be non-negative, segmentId will be -1. + private int subpartitionId = -1; Review Comment: Use JavaDoc. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It + * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also + * indicate an error or a segment id. + */ +public class NettyPayload { Review Comment: Take a look at `BufferIndexOrError`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
