xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1162243590


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link 
Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all 
subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean 
isEndOfPartition, int segmentId)

Review Comment:
   Why do we need `isEndOfParition` and `segmentId` for each emit?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link 
Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all 
subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean 
isEndOfPartition, int segmentId)
+            throws IOException;
+
+    /** Closes the {@link TierStorage}, the opened channels should be closed. 
*/
+    void close();
+
+    /** Releases the {@link TierStorage}, all resources should be released. */
+    void release();

Review Comment:
   What are the differences between these two methods?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier writer for the tiered store. The storage tier 
writers is independent of
+ * each other. Through the {@link TierWriter}, we can create {@link 
TierStorage} to store shuffle
+ * data.
+ */
+public interface TierWriter {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierStorage} of the {@link TierWriter} to write 
shuffle data. */
+    TierStorage createPartitionTierStorage();

Review Comment:
   This doesn't make sense. Why are we creating a storage from a writer? 
Shouldn't it be the other way around?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TierType.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The storage types for tiered store. */
+public enum TierType {

Review Comment:
   The enum type indicates that only these 3 types are supported. The questions 
is which components should be aware of that? Shouldn't this be limited to a 
factory or something?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link 
Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all 
subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean 
isEndOfPartition, int segmentId)

Review Comment:
   And why is a `Storage` only have writing interfaces but not reading?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link 
Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all 
subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean 
isEndOfPartition, int segmentId)

Review Comment:
   `partition`, `subpartition` and `emit` are still shuffle terminologies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to