xintongsong commented on code in PR #22316: URL: https://github.com/apache/flink/pull/22316#discussion_r1181374074
########## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245: ########## @@ -173,6 +173,7 @@ org.apache.flink.runtime.io.network.api.CheckpointBarrier does not satisfy: anno org.apache.flink.runtime.io.network.api.EndOfData does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.runtime.io.network.api.EndOfPartitionEvent does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated +org.apache.flink.runtime.io.network.api.EndOfSegmentEvent does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated Review Comment: Should not introduce a new violation. Instead, we should annotate `EndOfSegmentEvent` as `@Internal`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.RuntimeEvent; + +import java.io.IOException; + +/** + * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in Tiered Store shuffle + * mode. + */ +public class EndOfSegmentEvent extends RuntimeEvent { Review Comment: IIUC, the reason we need this is: - to differentiate netty messages that carry events from those carry data - to deserialize events and dispatch them to proper components for handling according to the event type I think this is only needed for tiers that transmit data via netty. Therefore, it's probably more suitable for a future PR where we plan to introduce the reusable netty service. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java: ########## @@ -895,6 +898,52 @@ public String toString() { } } + /** Message to notify producer about the required segment id. */ + static class SegmentId extends NettyMessage { Review Comment: Same here. I think this commit does not belong to this PR. We should introduce this together with the reusable netty service. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBufferReaderAdapter.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * {@link SingleInputGateBufferReaderAdapter} includes the logic of reading buffer in the {@link + * SingleInputGate}. + */ +public interface SingleInputGateBufferReaderAdapter extends Closeable { Review Comment: IIUC, the purpose for this adapter is to allow `InputGate` to read data via a bunch of tiered storage components, just like to read data directly from `InputChannel` as in existing shuffle modes. In general, I think the adapter approach is smart. However, the defined interface might not be optimal. E.g., what if we should read data from a remote storage rather than an input channel? Moreover, depending on the interface, the name can be improved. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStoreSubpartitionConsumerClient.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link TieredStoreSubpartitionConsumerClient} is used to consume buffer from a single + * subpartition in tiered store. + */ +public class TieredStoreSubpartitionConsumerClient { Review Comment: It's not very clear to me why do we need this class. Is this simply for wrapping states of a specific subpartition? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ########## @@ -32,6 +34,8 @@ public class TieredStorageConsumerClient implements SingleInputGateBufferReaderA private final TieredStoreSubpartitionConsumerClient[] tieredStoreSubpartitionReaders; + private final LocalTierFactory localTierFactory = new LocalTierFactory(); Review Comment: 1. What exactly is a *local tier*? 2. The storage client should not be aware of any specific tier factory. All tiers should be blackboxes to the storage client. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java: ########## @@ -895,6 +898,52 @@ public String toString() { } } + /** Message to notify producer about the required segment id. */ + static class SegmentId extends NettyMessage { Review Comment: Same here. I think this commit does not belong to this PR. We should introduce this together with the reusable netty service. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ########## @@ -18,5 +18,61 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; -/** Client of the Tiered Storage used by the consumer. */ -public class TieredStorageConsumerClient {} +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBufferReaderAdapter; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +/** {@link TieredStorageConsumerClient} is used to read buffer from the tiered store. */ +public class TieredStorageConsumerClient implements SingleInputGateBufferReaderAdapter { Review Comment: `SingleInputGateBufferReaderAdapter` is not yet introduced at this commit. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ########## @@ -18,5 +18,61 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; -/** Client of the Tiered Storage used by the consumer. */ -public class TieredStorageConsumerClient {} +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBufferReaderAdapter; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +/** {@link TieredStorageConsumerClient} is used to read buffer from the tiered store. */ +public class TieredStorageConsumerClient implements SingleInputGateBufferReaderAdapter { + + private final TieredStoreSubpartitionConsumerClient[] tieredStoreSubpartitionReaders; + + public TieredStorageConsumerClient( + int numInputChannels, Consumer<Integer> queueChannelReceiver) { + this.tieredStoreSubpartitionReaders = + new TieredStoreSubpartitionConsumerClient[numInputChannels]; + for (int i = 0; i < numInputChannels; ++i) { + tieredStoreSubpartitionReaders[i] = + new TieredStoreSubpartitionConsumerClient( + createTierConsumerAgents(), queueChannelReceiver); + } + } + + @Override + public void start() { + // TODO: the remote tier monitor will start in this method. + } + + @Override + public void close() throws IOException { + for (TieredStoreSubpartitionConsumerClient tieredStoreSubpartitionReader : + tieredStoreSubpartitionReaders) { + tieredStoreSubpartitionReader.close(); + } + } + + @Override + public Optional<InputChannel.BufferAndAvailability> getNextBuffer(InputChannel inputChannel) Review Comment: Tiered storage client should not be aware of input channel, which is a Flink shuffle concept. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ########## @@ -18,5 +18,61 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; -/** Client of the Tiered Storage used by the consumer. */ -public class TieredStorageConsumerClient {} +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBufferReaderAdapter; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +/** {@link TieredStorageConsumerClient} is used to read buffer from the tiered store. */ +public class TieredStorageConsumerClient implements SingleInputGateBufferReaderAdapter { Review Comment: `SingleInputGateBufferReaderAdapter` is not yet introduced at this commit. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java: ########## @@ -289,7 +289,10 @@ enum DataType { * Indicates that this subpartition state is fully recovered (emitted). Further data can be * consumed after unblocking. */ - RECOVERY_COMPLETION(false, true, true, false, false); + RECOVERY_COMPLETION(false, true, true, false, false), + + /** {@link #ADD_SEGMENT_ID_EVENT} indicates that a segment is finished in a subpartition. */ + ADD_SEGMENT_ID_EVENT(false, true, false, false, false); Review Comment: Why is this different from end-of-segment? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java: ########## @@ -18,5 +18,21 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; + +import java.io.Closeable; +import java.util.Optional; + /** The consumer-side agent of a Tier. */ -public interface TierConsumerAgent {} +public interface TierConsumerAgent extends Closeable { + + /** + * Get buffer from the client according to specific subpartition and segment id. + * + * @param inputChannel indicates the subpartition to read. + * @param segmentId indicate the id of segment. + * @return the next buffer. + */ + Optional<InputChannel.BufferAndAvailability> getNextBuffer( + InputChannel inputChannel, int segmentId); Review Comment: The tier consumer agent should not be aware of neither `InputChannel` nor `BufferAndAvailability`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java: ########## @@ -18,5 +18,21 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; + +import java.io.Closeable; +import java.util.Optional; + /** The consumer-side agent of a Tier. */ -public interface TierConsumerAgent {} +public interface TierConsumerAgent extends Closeable { + + /** + * Get buffer from the client according to specific subpartition and segment id. + * + * @param inputChannel indicates the subpartition to read. + * @param segmentId indicate the id of segment. + * @return the next buffer. + */ + Optional<InputChannel.BufferAndAvailability> getNextBuffer( + InputChannel inputChannel, int segmentId); Review Comment: The tier consumer agent should not be aware of neither `InputChannel` nor `BufferAndAvailability`. ########## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245: ########## @@ -173,6 +173,7 @@ org.apache.flink.runtime.io.network.api.CheckpointBarrier does not satisfy: anno org.apache.flink.runtime.io.network.api.EndOfData does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.runtime.io.network.api.EndOfPartitionEvent does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated +org.apache.flink.runtime.io.network.api.EndOfSegmentEvent does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated Review Comment: Actually, I don't think any shuffle event should belong to the `api` package. We probably can deal with this later as it's not introduced by this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
