This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd11c4036eff2e6cefcaabef0658e4f5886e109b
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed May 10 19:49:01 2023 +0800

    [FLINK-31635][network] Introduce tiered storage resource and the resource 
registry
---
 .../TieredStorageResource.java}                    | 16 ++----
 .../storage/TieredStorageResourceRegistry.java     | 62 ++++++++++++++++++++++
 .../partition/hybrid/tiered/tier/TierFactory.java  |  4 +-
 3 files changed, 70 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageResource.java
similarity index 68%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageResource.java
index 3225c3a5a09..fa1d85111ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageResource.java
@@ -16,17 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
-/** A factory that creates all the components of a tier. */
-public interface TierFactory {
+/** The resource (e.g., local files, remote storage files, etc.) for the 
Tiered Storage. */
+public interface TieredStorageResource {
 
-    /** Creates the master-side agent of a Tier. */
-    TierMasterAgent createMasterAgent();
-
-    /** Creates the producer-side agent of a Tier. */
-    TierProducerAgent createProducerAgent();
-
-    /** Creates the consumer-side agent of a Tier. */
-    TierConsumerAgent createConsumerAgent();
+    /** Release all the resources, e.g. delete the files, recycle the occupied 
memory, etc. */
+    void release();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageResourceRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageResourceRegistry.java
new file mode 100644
index 00000000000..cae47f969f7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageResourceRegistry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageDataIdentifier;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A registry that maintains local or remote resources that correspond to a 
certain set of data in
+ * the Tiered Storage.
+ */
+public class TieredStorageResourceRegistry {
+
+    private final Map<TieredStorageDataIdentifier, List<TieredStorageResource>>
+            registeredResources = new HashMap<>();
+
+    /**
+     * Register a new resource for the given owner.
+     *
+     * @param owner identifier of the data that the resource corresponds to.
+     * @param tieredStorageResource the tiered storage resources to be 
registered.
+     */
+    public void registerResource(
+            TieredStorageDataIdentifier owner, TieredStorageResource 
tieredStorageResource) {
+        registeredResources
+                .computeIfAbsent(owner, (ignore) -> new ArrayList<>())
+                .add(tieredStorageResource);
+    }
+
+    /**
+     * Remove all resources for the given owner.
+     *
+     * @param owner identifier of the data that the resources correspond to.
+     */
+    public void clearResourceFor(TieredStorageDataIdentifier owner) {
+        List<TieredStorageResource> cleanersForOwner = 
registeredResources.remove(owner);
+
+        if (cleanersForOwner != null) {
+            cleanersForOwner.forEach(TieredStorageResource::release);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
index 3225c3a5a09..5499fc59ff6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+
 /** A factory that creates all the components of a tier. */
 public interface TierFactory {
 
     /** Creates the master-side agent of a Tier. */
-    TierMasterAgent createMasterAgent();
+    TierMasterAgent createMasterAgent(TieredStorageResourceRegistry 
tieredStorageResourceRegistry);
 
     /** Creates the producer-side agent of a Tier. */
     TierProducerAgent createProducerAgent();

Reply via email to