xintongsong commented on code in PR #22316:
URL: https://github.com/apache/flink/pull/22316#discussion_r1181374074


##########
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245:
##########
@@ -173,6 +173,7 @@ org.apache.flink.runtime.io.network.api.CheckpointBarrier 
does not satisfy: anno
 org.apache.flink.runtime.io.network.api.EndOfData does not satisfy: annotated 
with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.runtime.io.network.api.EndOfPartitionEvent does not satisfy: 
annotated with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent does not satisfy: 
annotated with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated
+org.apache.flink.runtime.io.network.api.EndOfSegmentEvent does not satisfy: 
annotated with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated

Review Comment:
   Should not introduce a new violation. Instead, we should annotate 
`EndOfSegmentEvent` as `@Internal`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in 
Tiered Store shuffle
+ * mode.
+ */
+public class EndOfSegmentEvent extends RuntimeEvent {

Review Comment:
   IIUC, the reason we need this is:
   - to differentiate netty messages that carry events from those carry data
   - to deserialize events and dispatch them to proper components for handling 
according to the event type
   
   I think this is only needed for tiers that transmit data via netty. 
Therefore, it's probably more suitable for a future PR where we plan to 
introduce the reusable netty service.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java:
##########
@@ -895,6 +898,52 @@ public String toString() {
         }
     }
 
+    /** Message to notify producer about the required segment id. */
+    static class SegmentId extends NettyMessage {

Review Comment:
   Same here. I think this commit does not belong to this PR. We should 
introduce this together with the reusable netty service.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBufferReaderAdapter.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * {@link SingleInputGateBufferReaderAdapter} includes the logic of reading 
buffer in the {@link
+ * SingleInputGate}.
+ */
+public interface SingleInputGateBufferReaderAdapter extends Closeable {

Review Comment:
   IIUC, the purpose for this adapter is to allow `InputGate` to read data via 
a bunch of tiered storage components, just like to read data directly from 
`InputChannel` as in existing shuffle modes.
   
   In general, I think the adapter approach is smart. However, the defined 
interface might not be optimal. E.g., what if we should read data from a remote 
storage rather than an input channel? Moreover, depending on the interface, the 
name can be improved.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStoreSubpartitionConsumerClient.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link TieredStoreSubpartitionConsumerClient} is used to consume buffer 
from a single
+ * subpartition in tiered store.
+ */
+public class TieredStoreSubpartitionConsumerClient {

Review Comment:
   It's not very clear to me why do we need this class. Is this simply for 
wrapping states of a specific subpartition?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -32,6 +34,8 @@ public class TieredStorageConsumerClient implements 
SingleInputGateBufferReaderA
 
     private final TieredStoreSubpartitionConsumerClient[] 
tieredStoreSubpartitionReaders;
 
+    private final LocalTierFactory localTierFactory = new LocalTierFactory();

Review Comment:
   1. What exactly is a *local tier*?
   2. The storage client should not be aware of any specific tier factory. All 
tiers should be blackboxes to the storage client.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java:
##########
@@ -895,6 +898,52 @@ public String toString() {
         }
     }
 
+    /** Message to notify producer about the required segment id. */
+    static class SegmentId extends NettyMessage {

Review Comment:
   Same here. I think this commit does not belong to this PR. We should 
introduce this together with the reusable netty service.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -18,5 +18,61 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
-/** Client of the Tiered Storage used by the consumer. */
-public class TieredStorageConsumerClient {}
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBufferReaderAdapter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/** {@link TieredStorageConsumerClient} is used to read buffer from the tiered 
store. */
+public class TieredStorageConsumerClient implements 
SingleInputGateBufferReaderAdapter {

Review Comment:
   `SingleInputGateBufferReaderAdapter` is not yet introduced at this commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -18,5 +18,61 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
-/** Client of the Tiered Storage used by the consumer. */
-public class TieredStorageConsumerClient {}
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBufferReaderAdapter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/** {@link TieredStorageConsumerClient} is used to read buffer from the tiered 
store. */
+public class TieredStorageConsumerClient implements 
SingleInputGateBufferReaderAdapter {
+
+    private final TieredStoreSubpartitionConsumerClient[] 
tieredStoreSubpartitionReaders;
+
+    public TieredStorageConsumerClient(
+            int numInputChannels, Consumer<Integer> queueChannelReceiver) {
+        this.tieredStoreSubpartitionReaders =
+                new TieredStoreSubpartitionConsumerClient[numInputChannels];
+        for (int i = 0; i < numInputChannels; ++i) {
+            tieredStoreSubpartitionReaders[i] =
+                    new TieredStoreSubpartitionConsumerClient(
+                            createTierConsumerAgents(), queueChannelReceiver);
+        }
+    }
+
+    @Override
+    public void start() {
+        // TODO: the remote tier monitor will start in this method.
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (TieredStoreSubpartitionConsumerClient 
tieredStoreSubpartitionReader :
+                tieredStoreSubpartitionReaders) {
+            tieredStoreSubpartitionReader.close();
+        }
+    }
+
+    @Override
+    public Optional<InputChannel.BufferAndAvailability> 
getNextBuffer(InputChannel inputChannel)

Review Comment:
   Tiered storage client should not be aware of input channel, which is a Flink 
shuffle concept.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -18,5 +18,61 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
-/** Client of the Tiered Storage used by the consumer. */
-public class TieredStorageConsumerClient {}
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBufferReaderAdapter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/** {@link TieredStorageConsumerClient} is used to read buffer from the tiered 
store. */
+public class TieredStorageConsumerClient implements 
SingleInputGateBufferReaderAdapter {

Review Comment:
   `SingleInputGateBufferReaderAdapter` is not yet introduced at this commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java:
##########
@@ -289,7 +289,10 @@ enum DataType {
          * Indicates that this subpartition state is fully recovered 
(emitted). Further data can be
          * consumed after unblocking.
          */
-        RECOVERY_COMPLETION(false, true, true, false, false);
+        RECOVERY_COMPLETION(false, true, true, false, false),
+
+        /** {@link #ADD_SEGMENT_ID_EVENT} indicates that a segment is finished 
in a subpartition. */
+        ADD_SEGMENT_ID_EVENT(false, true, false, false, false);

Review Comment:
   Why is this different from end-of-segment?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java:
##########
@@ -18,5 +18,21 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+
+import java.io.Closeable;
+import java.util.Optional;
+
 /** The consumer-side agent of a Tier. */
-public interface TierConsumerAgent {}
+public interface TierConsumerAgent extends Closeable {
+
+    /**
+     * Get buffer from the client according to specific subpartition and 
segment id.
+     *
+     * @param inputChannel indicates the subpartition to read.
+     * @param segmentId indicate the id of segment.
+     * @return the next buffer.
+     */
+    Optional<InputChannel.BufferAndAvailability> getNextBuffer(
+            InputChannel inputChannel, int segmentId);

Review Comment:
   The tier consumer agent should not be aware of neither `InputChannel` nor 
`BufferAndAvailability`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java:
##########
@@ -18,5 +18,21 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+
+import java.io.Closeable;
+import java.util.Optional;
+
 /** The consumer-side agent of a Tier. */
-public interface TierConsumerAgent {}
+public interface TierConsumerAgent extends Closeable {
+
+    /**
+     * Get buffer from the client according to specific subpartition and 
segment id.
+     *
+     * @param inputChannel indicates the subpartition to read.
+     * @param segmentId indicate the id of segment.
+     * @return the next buffer.
+     */
+    Optional<InputChannel.BufferAndAvailability> getNextBuffer(
+            InputChannel inputChannel, int segmentId);

Review Comment:
   The tier consumer agent should not be aware of neither `InputChannel` nor 
`BufferAndAvailability`.



##########
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245:
##########
@@ -173,6 +173,7 @@ org.apache.flink.runtime.io.network.api.CheckpointBarrier 
does not satisfy: anno
 org.apache.flink.runtime.io.network.api.EndOfData does not satisfy: annotated 
with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.runtime.io.network.api.EndOfPartitionEvent does not satisfy: 
annotated with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent does not satisfy: 
annotated with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated
+org.apache.flink.runtime.io.network.api.EndOfSegmentEvent does not satisfy: 
annotated with @Internal or annotated with @Experimental or annotated with 
@PublicEvolving or annotated with @Public or annotated with @Deprecated

Review Comment:
   Actually, I don't think any shuffle event should belong to the `api` 
package. We probably can deal with this later as it's not introduced by this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to