This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a903fa0a30a8717653426a4fa5ac0229c5c08b4
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed May 10 19:47:37 2023 +0800

    [FLINK-31635][network] Introduce the interfaces in the tier for the tiered 
storage
---
 .../tiered/common/TieredStorageConfiguration.java  | 11 +++++
 .../TierConsumerAgent.java}                        | 26 ++----------
 .../TierFactory.java}                              | 28 ++++---------
 .../TierMasterAgent.java}                          | 35 +++++++---------
 .../hybrid/tiered/tier/TierProducerAgent.java      | 49 ++++++++++++++++++++++
 5 files changed, 88 insertions(+), 61 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
index 185c267bd6f..e0d40c40c2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
@@ -19,10 +19,21 @@
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
 
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+
+import java.util.Arrays;
+import java.util.List;
 
 /** Configurations for the Tiered Storage. */
 public class TieredStorageConfiguration {
 
+    // TODO, after implementing the tier factory, add appreciate 
implementations to the array.
+    private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = 
new TierFactory[0];
+
+    public List<TierFactory> getTierFactories() {
+        return Arrays.asList(DEFAULT_MEMORY_DISK_TIER_FACTORIES);
+    }
+
     public static TieredStorageConfiguration.Builder builder() {
         return new TieredStorageConfiguration.Builder();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java
similarity index 51%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java
index 185c267bd6f..b9dc497089c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java
@@ -16,27 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
-import org.apache.flink.configuration.Configuration;
-
-/** Configurations for the Tiered Storage. */
-public class TieredStorageConfiguration {
-
-    public static TieredStorageConfiguration.Builder builder() {
-        return new TieredStorageConfiguration.Builder();
-    }
-
-    /** Builder for {@link TieredStorageConfiguration}. */
-    public static class Builder {
-        public TieredStorageConfiguration build() {
-            return new TieredStorageConfiguration();
-        }
-    }
-
-    public static TieredStorageConfiguration fromConfiguration(Configuration 
conf) {
-        // TODO, from the configuration, get the configured options(i.e., 
remote storage path, the
-        // reserved storage size, etc.), then set them to the builder.
-        return new TieredStorageConfiguration.Builder().build();
-    }
-}
+/** The consumer-side agent of a Tier. */
+public interface TierConsumerAgent {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
similarity index 51%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
index 185c267bd6f..3225c3a5a09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
@@ -16,27 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
-import org.apache.flink.configuration.Configuration;
+/** A factory that creates all the components of a tier. */
+public interface TierFactory {
 
-/** Configurations for the Tiered Storage. */
-public class TieredStorageConfiguration {
+    /** Creates the master-side agent of a Tier. */
+    TierMasterAgent createMasterAgent();
 
-    public static TieredStorageConfiguration.Builder builder() {
-        return new TieredStorageConfiguration.Builder();
-    }
+    /** Creates the producer-side agent of a Tier. */
+    TierProducerAgent createProducerAgent();
 
-    /** Builder for {@link TieredStorageConfiguration}. */
-    public static class Builder {
-        public TieredStorageConfiguration build() {
-            return new TieredStorageConfiguration();
-        }
-    }
-
-    public static TieredStorageConfiguration fromConfiguration(Configuration 
conf) {
-        // TODO, from the configuration, get the configured options(i.e., 
remote storage path, the
-        // reserved storage size, etc.), then set them to the builder.
-        return new TieredStorageConfiguration.Builder().build();
-    }
+    /** Creates the consumer-side agent of a Tier. */
+    TierConsumerAgent createConsumerAgent();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java
similarity index 51%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java
index 185c267bd6f..2c34cdd38c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java
@@ -16,27 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
-import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 
-/** Configurations for the Tiered Storage. */
-public class TieredStorageConfiguration {
+/** The master-side agent of a Tier. */
+public interface TierMasterAgent {
 
-    public static TieredStorageConfiguration.Builder builder() {
-        return new TieredStorageConfiguration.Builder();
-    }
+    /**
+     * Add a new tiered storage partition.
+     *
+     * @param partitionId the identifier of the new partition
+     */
+    void addPartition(TieredStoragePartitionId partitionId);
 
-    /** Builder for {@link TieredStorageConfiguration}. */
-    public static class Builder {
-        public TieredStorageConfiguration build() {
-            return new TieredStorageConfiguration();
-        }
-    }
-
-    public static TieredStorageConfiguration fromConfiguration(Configuration 
conf) {
-        // TODO, from the configuration, get the configured options(i.e., 
remote storage path, the
-        // reserved storage size, etc.), then set them to the builder.
-        return new TieredStorageConfiguration.Builder().build();
-    }
+    /**
+     * Release a tiered storage partition.
+     *
+     * @param partitionId the identifier of partition to be released
+     */
+    void releasePartition(TieredStoragePartitionId partitionId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java
new file mode 100644
index 00000000000..9c39e8e05ef
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+
+/** The producer-side agent of a Tier. */
+public interface TierProducerAgent {
+
+    /**
+     * Try to start a new segment in the Tier.
+     *
+     * @param subpartitionId subpartition id that the new segment belongs to
+     * @param segmentId id of the new segment
+     * @return true if the segment can be started, false otherwise.
+     */
+    boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int 
segmentId);
+
+    /** Writes the finished {@link Buffer} to the consumer. */
+    boolean write(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer)
+            throws IOException;
+
+    /**
+     * Close the agent.
+     *
+     * <p>Note this only releases resources directly hold by the agent, which 
excludes resources
+     * managed by the resource registry.
+     */
+    void close();
+}

Reply via email to