xintongsong commented on code in PR #22342: URL: https://github.com/apache/flink/pull/22342#discussion_r1218798506
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoragePartitionIdAndSubpartitionId.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; + +/** The combination of {@link TieredStoragePartitionId} and {@link TieredStorageSubpartitionId}. */ +public class TieredStoragePartitionIdAndSubpartitionId { Review Comment: 1. This should belong to the `common` package 2. Might make sense to implement `TieredStorageDataIdentifier` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */ +public interface NettyConnectionWriter { + /** + * Write a buffer. Review Comment: ```suggestion * Write a buffer to the netty connection. ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */ +public interface NettyConnectionWriter { + /** + * Write a buffer. + * + * @param bufferContext buffer context represents the buffer. + */ + void writeBuffer(BufferContext bufferContext); + + /** + * Get the number of existed buffers in the writer. + * + * @return the buffer number. + */ + int size(); Review Comment: ```suggestion int numQueuedBuffers(); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition + * id, and it could also indicate an error or a segment id. + */ +public class BufferContext { Review Comment: This is not a tiered storage concept. Maybe it belongs to the netty package. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition + * id, and it could also indicate an error or a segment id. + */ +public class BufferContext { + + private Buffer buffer; + + private Throwable error; + + private int bufferIndex; + + private int subpartitionId; + + private int segmentId = -1; Review Comment: 1. Need javadoc. 2. Should these be final? 3. Some of the fields should be `@Nullable`, and should explain the semantic of null values. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */ +public interface NettyConnectionWriter { + /** + * Write a buffer. + * + * @param bufferContext buffer context represents the buffer. + */ + void writeBuffer(BufferContext bufferContext); + + /** + * Get the number of existed buffers in the writer. + * + * @return the buffer number. + */ + int size(); Review Comment: We should explain in the class JavaDoc that the buffers are firstly queued locally before sent to the receiver. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */ +public interface NettyConnectionWriter { + /** + * Write a buffer. + * + * @param bufferContext buffer context represents the buffer. + */ + void writeBuffer(BufferContext bufferContext); + + /** + * Get the number of existed buffers in the writer. + * + * @return the buffer number. + */ + int size(); + + /** Close the writer and recycle all buffers. */ Review Comment: ```suggestion /** Close the connection and release all resources. */ ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; + +import java.util.Optional; + +/** {@link NettyConnectionReader} is used by {@link TierConsumerAgent} to read buffer from netty. */ +public interface NettyConnectionReader { + /** + * Read a buffer. + * + * @param segmentId segment id indicates the id of segment. + * @return a buffer. + */ + Optional<Buffer> readBuffer(int segmentId); Review Comment: What does it mean if an empty optional is returned? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/impl/NettyConnectionReaderImpl.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.impl; Review Comment: The `impl` package might not be necessary. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/impl/NettyConnectionReaderImpl.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.impl; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionReader; +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.BiConsumer; + +/** The default implementation of {@link NettyConnectionReader}. */ +public class NettyConnectionReaderImpl implements NettyConnectionReader { + + private final int subpartitionId; + private final InputChannel[] inputChannels; + private final BiConsumer<Integer, Boolean> queueChannelCallback; + private final int[] lastPrioritySequenceNumber; + private int lastRequiredSegmentId = 0; Review Comment: We need to explain what are these things? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java: ########## @@ -63,6 +67,8 @@ public class TieredResultPartition extends ResultPartition { private final TieredStorageResourceRegistry tieredStorageResourceRegistry; + private final TieredStorageNettyService nettyService; Review Comment: Let's change this to `TieredStorageNettyServiceImpl` to make it explicit that the result partition sees the actual netty service implementation. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java: ########## @@ -311,6 +311,12 @@ public long unsynchronizedGetSizeOfQueuedBuffers() { return 0; } + // ------------------------------------------------------------------------ + // For tiered storage + // ------------------------------------------------------------------------ + + public void notifyRequiredSegmentId(int segmentId) {} Review Comment: What is the semantic here? Who should implement it? If this will be completed in future, we should add a `TODO` mark. Or if this is meant to be overridden, we should explain it in comment. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +/** + * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition + * id, and it could also indicate an error or a segment id. + */ +public class BufferContext { Review Comment: Maybe rename it to something like `NettyPayload`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
