This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 64992471c5dd52fb4269f6de51c15e27c2a45b55
Author: 1996fanrui <[email protected]>
AuthorDate: Thu Oct 26 14:38:25 2023 +0800

    [FLINK-33354][runtime][refactor] Refactor ShuffleDescriptorsCache into a 
generic GroupCache
---
 .../deployment/InputGateDeploymentDescriptor.java  |   9 +-
 .../deployment/TaskDeploymentDescriptor.java       |   5 +-
 .../DefaultShuffleDescriptorsCache.java            | 153 -------------------
 .../taskexecutor/ShuffleDescriptorsCache.java      |  54 -------
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   8 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  17 ++-
 .../flink/runtime/util/DefaultGroupCache.java      | 163 +++++++++++++++++++++
 .../org/apache/flink/runtime/util/GroupCache.java  |  50 +++++++
 .../DefaultExecutionGraphDeploymentTest.java       |   5 +-
 ...hDeploymentWithSmallBlobCacheSizeLimitTest.java |   5 +-
 .../taskexecutor/TaskManagerServicesBuilder.java   |   3 +-
 .../DefaultGroupCacheTest.java}                    |  45 +++---
 .../NoOpGroupCache.java}                           |  19 +--
 13 files changed, 274 insertions(+), 262 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 50ed98ed1dc..333a91e0a73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache;
+import org.apache.flink.runtime.util.GroupCache;
 import org.apache.flink.util.CompressedSerializedValue;
 import org.apache.flink.util.Preconditions;
 
@@ -167,7 +167,7 @@ public class InputGateDeploymentDescriptor implements 
Serializable {
     public void tryLoadAndDeserializeShuffleDescriptors(
             @Nullable PermanentBlobService blobService,
             JobID jobId,
-            ShuffleDescriptorsCache shuffleDescriptorsCache)
+            GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache)
             throws IOException {
         if (inputChannels != null) {
             return;
@@ -190,13 +190,14 @@ public class InputGateDeploymentDescriptor implements 
Serializable {
             @Nullable PermanentBlobService blobService,
             JobID jobId,
             MaybeOffloaded<ShuffleDescriptorGroup> 
serializedShuffleDescriptors,
-            ShuffleDescriptorsCache shuffleDescriptorsCache)
+            GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache)
             throws IOException, ClassNotFoundException {
         if (serializedShuffleDescriptors instanceof Offloaded) {
             PermanentBlobKey blobKey =
                     ((Offloaded<ShuffleDescriptorGroup>) 
serializedShuffleDescriptors)
                             .serializedValueKey;
-            ShuffleDescriptorGroup shuffleDescriptorGroup = 
shuffleDescriptorsCache.get(blobKey);
+            ShuffleDescriptorGroup shuffleDescriptorGroup =
+                    shuffleDescriptorsCache.get(jobId, blobKey);
             if (shuffleDescriptorGroup == null) {
                 Preconditions.checkNotNull(blobService);
                 // NOTE: Do not delete the ShuffleDescriptor BLOBs since it 
may be needed again
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index bd3b770142c..5985801a5f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,10 +23,11 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache;
+import org.apache.flink.runtime.util.GroupCache;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -242,7 +243,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
      */
     public void loadBigData(
             @Nullable PermanentBlobService blobService,
-            ShuffleDescriptorsCache shuffleDescriptorsCache)
+            GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache)
             throws IOException, ClassNotFoundException {
 
         // re-integrate offloaded job info from blob
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java
deleted file mode 100644
index 99a97f2370b..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
-
-import org.apache.flink.shaded.guava31.com.google.common.base.Ticker;
-import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
-import 
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be 
expired after timeout. */
-public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache 
{
-    private final Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry> 
shuffleDescriptorsCache;
-    private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob;
-
-    private DefaultShuffleDescriptorsCache(
-            Duration expireTimeout, int cacheSizeLimit, Ticker ticker) {
-        this.cachedBlobKeysPerJob = new HashMap<>();
-        this.shuffleDescriptorsCache =
-                CacheBuilder.newBuilder()
-                        .concurrencyLevel(1)
-                        .maximumSize(cacheSizeLimit)
-                        .expireAfterAccess(expireTimeout)
-                        .ticker(ticker)
-                        .removalListener(this::onCacheRemoval)
-                        .build();
-    }
-
-    @Override
-    public void clear() {
-        cachedBlobKeysPerJob.clear();
-        shuffleDescriptorsCache.cleanUp();
-    }
-
-    @Override
-    public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) {
-        ShuffleDescriptorCacheEntry entry = 
shuffleDescriptorsCache.getIfPresent(blobKey);
-        return entry == null ? null : entry.getShuffleDescriptorGroup();
-    }
-
-    @Override
-    public void put(
-            JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup 
shuffleDescriptorGroup) {
-        shuffleDescriptorsCache.put(
-                blobKey, new 
ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobId));
-        cachedBlobKeysPerJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
-    }
-
-    @Override
-    public void clearCacheForJob(JobID jobId) {
-        Set<PermanentBlobKey> removed = cachedBlobKeysPerJob.remove(jobId);
-        if (removed != null) {
-            shuffleDescriptorsCache.invalidateAll(removed);
-        }
-    }
-
-    /**
-     * Removal listener that remove the index of 
serializedShuffleDescriptorsPerJob .
-     *
-     * @param removalNotification of removed element.
-     */
-    private void onCacheRemoval(
-            RemovalNotification<PermanentBlobKey, ShuffleDescriptorCacheEntry>
-                    removalNotification) {
-        PermanentBlobKey blobKey = removalNotification.getKey();
-        ShuffleDescriptorCacheEntry entry = removalNotification.getValue();
-        if (blobKey != null && entry != null) {
-            cachedBlobKeysPerJob.computeIfPresent(
-                    entry.getJobId(),
-                    (jobID, permanentBlobKeys) -> {
-                        permanentBlobKeys.remove(blobKey);
-                        if (permanentBlobKeys.isEmpty()) {
-                            return null;
-                        } else {
-                            return permanentBlobKeys;
-                        }
-                    });
-        }
-    }
-
-    private static class ShuffleDescriptorCacheEntry {
-        private final ShuffleDescriptorGroup shuffleDescriptorGroup;
-        private final JobID jobId;
-
-        public ShuffleDescriptorCacheEntry(
-                ShuffleDescriptorGroup shuffleDescriptorGroup, JobID jobId) {
-            this.shuffleDescriptorGroup = checkNotNull(shuffleDescriptorGroup);
-            this.jobId = checkNotNull(jobId);
-        }
-
-        public ShuffleDescriptorGroup getShuffleDescriptorGroup() {
-            return shuffleDescriptorGroup;
-        }
-
-        public JobID getJobId() {
-            return jobId;
-        }
-    }
-
-    /** The Factory of {@link DefaultShuffleDescriptorsCache}. */
-    public static class Factory {
-        private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = 
Duration.ofSeconds(300);
-        private static final int DEFAULT_CACHE_SIZE_LIMIT = 100;
-        private static final Ticker DEFAULT_TICKER = Ticker.systemTicker();
-
-        private final Duration cacheExpireTimeout;
-        private final int cacheSizeLimit;
-        private final Ticker ticker;
-
-        public Factory() {
-            this(DEFAULT_CACHE_EXPIRE_TIMEOUT, DEFAULT_CACHE_SIZE_LIMIT, 
DEFAULT_TICKER);
-        }
-
-        @VisibleForTesting
-        public Factory(Duration cacheExpireTimeout, int cacheSizeLimit, Ticker 
ticker) {
-            this.cacheExpireTimeout = cacheExpireTimeout;
-            this.cacheSizeLimit = cacheSizeLimit;
-            this.ticker = ticker;
-        }
-
-        public DefaultShuffleDescriptorsCache create() {
-            return new DefaultShuffleDescriptorsCache(cacheExpireTimeout, 
cacheSizeLimit, ticker);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java
deleted file mode 100644
index a86e6a67722..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
-
-/** Cache of shuffle descriptors in TaskExecutor. */
-public interface ShuffleDescriptorsCache {
-
-    /** clear all cache. */
-    void clear();
-
-    /**
-     * Get shuffle descriptor group in cache.
-     *
-     * @param blobKey identify the shuffle descriptor group
-     * @return shuffle descriptor group in cache if exists, otherwise null
-     */
-    ShuffleDescriptorGroup get(PermanentBlobKey blobKey);
-
-    /**
-     * Put shuffle descriptor group to cache.
-     *
-     * @param jobId of job
-     * @param blobKey identify the shuffle descriptor group
-     * @param shuffleDescriptorGroup shuffle descriptor group to cache
-     */
-    void put(JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup 
shuffleDescriptorGroup);
-
-    /**
-     * Clear all cache for the Job.
-     *
-     * @param jobId of job
-     */
-    void clearCacheForJob(JobID jobId);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 680514f09a6..7146c2befa9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.management.jmx.JMXService;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.JobPermanentBlobService;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.TaskExecutorBlobService;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.blob.TransientBlobService;
@@ -39,6 +40,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -131,6 +133,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.util.GroupCache;
 import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.CollectionUtil;
@@ -297,7 +300,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
     private final ThreadInfoSampleService threadInfoSampleService;
 
-    private final ShuffleDescriptorsCache shuffleDescriptorsCache;
+    private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup>
+            shuffleDescriptorsCache;
 
     public TaskExecutor(
             RpcService rpcService,
@@ -1903,7 +1907,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         changelogStoragesManager.releaseResourcesForJob(jobId);
         currentSlotOfferPerJob.remove(jobId);
         channelStateExecutorFactoryManager.releaseResourcesForJob(jobId);
-        shuffleDescriptorsCache.clearCacheForJob(jobId);
+        shuffleDescriptorsCache.clearCacheForGroup(jobId);
         fileMergingManager.releaseMergingSnapshotManagerForJob(jobId);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index f4b206421aa..ae78575410e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -48,6 +51,8 @@ import 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.util.DefaultGroupCache;
+import org.apache.flink.runtime.util.GroupCache;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -90,7 +95,8 @@ public class TaskManagerServices {
     private final LibraryCacheManager libraryCacheManager;
     private final SlotAllocationSnapshotPersistenceService 
slotAllocationSnapshotPersistenceService;
     private final SharedResources sharedResources;
-    private final ShuffleDescriptorsCache shuffleDescriptorsCache;
+    private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup>
+            shuffleDescriptorsCache;
 
     TaskManagerServices(
             UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
@@ -111,7 +117,7 @@ public class TaskManagerServices {
             LibraryCacheManager libraryCacheManager,
             SlotAllocationSnapshotPersistenceService 
slotAllocationSnapshotPersistenceService,
             SharedResources sharedResources,
-            ShuffleDescriptorsCache shuffleDescriptorsCache) {
+            GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache) {
 
         this.unresolvedTaskManagerLocation =
                 Preconditions.checkNotNull(unresolvedTaskManagerLocation);
@@ -208,7 +214,7 @@ public class TaskManagerServices {
         return sharedResources;
     }
 
-    public ShuffleDescriptorsCache getShuffleDescriptorCache() {
+    public GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
getShuffleDescriptorCache() {
         return shuffleDescriptorsCache;
     }
 
@@ -409,8 +415,9 @@ public class TaskManagerServices {
                     NoOpSlotAllocationSnapshotPersistenceService.INSTANCE;
         }
 
-        final ShuffleDescriptorsCache shuffleDescriptorsCache =
-                new DefaultShuffleDescriptorsCache.Factory().create();
+        final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache =
+                new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
ShuffleDescriptorGroup>()
+                        .create();
 
         return new TaskManagerServices(
                 unresolvedTaskManagerLocation,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java
new file mode 100644
index 00000000000..e424e0521b6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Default implement of {@link GroupCache}. Entries will be expired after 
timeout. */
+@NotThreadSafe
+public class DefaultGroupCache<G, K, V> implements GroupCache<G, K, V> {
+    private final Cache<CacheKey<G, K>, V> cache;
+    private final Map<G, Set<CacheKey<G, K>>> cachedBlobKeysPerJob;
+
+    private DefaultGroupCache(Duration expireTimeout, int cacheSizeLimit, 
Ticker ticker) {
+        this.cachedBlobKeysPerJob = new HashMap<>();
+        this.cache =
+                CacheBuilder.newBuilder()
+                        .concurrencyLevel(1)
+                        .maximumSize(cacheSizeLimit)
+                        .expireAfterAccess(expireTimeout)
+                        .ticker(ticker)
+                        .removalListener(this::onCacheRemoval)
+                        .build();
+    }
+
+    @Override
+    public void clear() {
+        cachedBlobKeysPerJob.clear();
+        cache.cleanUp();
+    }
+
+    @Override
+    public V get(G group, K key) {
+        return cache.getIfPresent(new CacheKey<>(group, key));
+    }
+
+    @Override
+    public void put(G group, K key, V value) {
+        CacheKey<G, K> cacheKey = new CacheKey<>(group, key);
+        cache.put(cacheKey, value);
+        cachedBlobKeysPerJob.computeIfAbsent(group, ignore -> new 
HashSet<>()).add(cacheKey);
+    }
+
+    @Override
+    public void clearCacheForGroup(G group) {
+        Set<CacheKey<G, K>> removed = cachedBlobKeysPerJob.remove(group);
+        if (removed != null) {
+            cache.invalidateAll(removed);
+        }
+    }
+
+    /**
+     * Removal listener that remove the cache key of this group .
+     *
+     * @param removalNotification of removed element.
+     */
+    private void onCacheRemoval(RemovalNotification<CacheKey<G, K>, V> 
removalNotification) {
+        CacheKey<G, K> cacheKey = removalNotification.getKey();
+        V value = removalNotification.getValue();
+        if (cacheKey != null && value != null) {
+            cachedBlobKeysPerJob.computeIfPresent(
+                    cacheKey.getGroup(),
+                    (group, keys) -> {
+                        keys.remove(cacheKey);
+                        if (keys.isEmpty()) {
+                            return null;
+                        } else {
+                            return keys;
+                        }
+                    });
+        }
+    }
+
+    private static class CacheKey<G, K> {
+        private final G group;
+        private final K key;
+
+        public CacheKey(G group, K key) {
+            this.group = group;
+            this.key = key;
+        }
+
+        public G getGroup() {
+            return group;
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CacheKey<?, ?> cacheKey = (CacheKey<?, ?>) o;
+            return Objects.equals(group, cacheKey.group) && 
Objects.equals(key, cacheKey.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(group, key);
+        }
+    }
+
+    /** The Factory of {@link DefaultGroupCache}. */
+    public static class Factory<G, K, V> {
+        private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = 
Duration.ofSeconds(300);
+        private static final int DEFAULT_CACHE_SIZE_LIMIT = 100;
+        private static final Ticker DEFAULT_TICKER = Ticker.systemTicker();
+
+        private final Duration cacheExpireTimeout;
+        private final int cacheSizeLimit;
+        private final Ticker ticker;
+
+        public Factory() {
+            this(DEFAULT_CACHE_EXPIRE_TIMEOUT, DEFAULT_CACHE_SIZE_LIMIT, 
DEFAULT_TICKER);
+        }
+
+        @VisibleForTesting
+        public Factory(Duration cacheExpireTimeout, int cacheSizeLimit, Ticker 
ticker) {
+            this.cacheExpireTimeout = cacheExpireTimeout;
+            this.cacheSizeLimit = cacheSizeLimit;
+            this.ticker = ticker;
+        }
+
+        public DefaultGroupCache<G, K, V> create() {
+            return new DefaultGroupCache<>(cacheExpireTimeout, cacheSizeLimit, 
ticker);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java
new file mode 100644
index 00000000000..25757837878
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import javax.annotation.Nullable;
+
+/**
+ * This {@link GroupCache} can cache group, key and value. The group and key 
are cache key, each key
+ * belongs to a certain group. All corresponding keys and values will be 
cleared if a group is
+ * cleared.
+ *
+ * @param <G> The group.
+ * @param <K> The key.
+ * @param <V> The value.
+ */
+public interface GroupCache<G, K, V> {
+
+    /** clear all cache. */
+    void clear();
+
+    /**
+     * Get value in cache.
+     *
+     * @return value in cache if exists, otherwise null
+     */
+    @Nullable
+    V get(G group, K key);
+
+    /** Put group, key and value to cache. */
+    void put(G group, K key, V value);
+
+    /** Clear all caches of the corresponding group. */
+    void clearCacheForGroup(G group);
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index fc2df072303..54d8fc30964 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -58,7 +58,6 @@ import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.taskexecutor.NoOpShuffleDescriptorsCache;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -66,6 +65,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.NoOpGroupCache;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.function.FunctionUtils;
@@ -179,8 +179,7 @@ class DefaultExecutionGraphDeploymentTest {
         taskManagerGateway.setSubmitConsumer(
                 FunctionUtils.uncheckedConsumer(
                         taskDeploymentDescriptor -> {
-                            taskDeploymentDescriptor.loadBigData(
-                                    blobCache, 
NoOpShuffleDescriptorsCache.INSTANCE);
+                            taskDeploymentDescriptor.loadBigData(blobCache, 
new NoOpGroupCache<>());
                             tdd.complete(taskDeploymentDescriptor);
                         }));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
index 6fdc9de9ade..1848faa3959 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
@@ -42,8 +42,8 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
-import org.apache.flink.runtime.taskexecutor.NoOpShuffleDescriptorsCache;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.NoOpGroupCache;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.function.FunctionUtils;
 
@@ -120,8 +120,7 @@ class 
DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
         taskManagerGateway.setSubmitConsumer(
                 FunctionUtils.uncheckedConsumer(
                         taskDeploymentDescriptor -> {
-                            taskDeploymentDescriptor.loadBigData(
-                                    blobCache, 
NoOpShuffleDescriptorsCache.INSTANCE);
+                            taskDeploymentDescriptor.loadBigData(blobCache, 
new NoOpGroupCache<>());
                             tdds.offer(taskDeploymentDescriptor);
                         }));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 2bd18f543a1..193e56e20c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
 import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.util.NoOpGroupCache;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -200,6 +201,6 @@ public class TaskManagerServicesBuilder {
                 libraryCacheManager,
                 slotAllocationSnapshotPersistenceService,
                 sharedResources,
-                NoOpShuffleDescriptorsCache.INSTANCE);
+                new NoOpGroupCache<>());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java
similarity index 76%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java
index 84c1acaa4e1..b5a3b717094 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskexecutor;
+package org.apache.flink.runtime.util;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -35,14 +35,14 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link DefaultShuffleDescriptorsCache}. */
-class DefaultShuffleDescriptorsCacheTest {
+/** Tests for {@link DefaultGroupCache}. */
+class DefaultGroupCacheTest {
     private final Duration expireTimeout = Duration.ofSeconds(10);
 
     @Test
     void testGetEntry() {
-        DefaultShuffleDescriptorsCache cache =
-                new DefaultShuffleDescriptorsCache.Factory(
+        DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
cache =
+                new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
ShuffleDescriptorGroup>(
                                 expireTimeout, Integer.MAX_VALUE, 
Ticker.systemTicker())
                         .create();
 
@@ -56,16 +56,16 @@ class DefaultShuffleDescriptorsCacheTest {
 
         PermanentBlobKey blobKey = new PermanentBlobKey();
 
-        assertThat(cache.get(blobKey)).isNull();
+        assertThat(cache.get(jobId, blobKey)).isNull();
 
         cache.put(jobId, blobKey, shuffleDescriptorGroup);
-        assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup);
+        assertThat(cache.get(jobId, 
blobKey)).isEqualTo(shuffleDescriptorGroup);
     }
 
     @Test
     void testClearCacheForJob() {
-        DefaultShuffleDescriptorsCache cache =
-                new DefaultShuffleDescriptorsCache.Factory(
+        DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
cache =
+                new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
ShuffleDescriptorGroup>(
                                 expireTimeout, Integer.MAX_VALUE, 
Ticker.systemTicker())
                         .create();
 
@@ -78,19 +78,20 @@ class DefaultShuffleDescriptorsCacheTest {
                         });
         PermanentBlobKey blobKey = new PermanentBlobKey();
 
-        assertThat(cache.get(blobKey)).isNull();
+        assertThat(cache.get(jobId, blobKey)).isNull();
 
         cache.put(jobId, blobKey, shuffleDescriptorGroup);
-        assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup);
+        assertThat(cache.get(jobId, 
blobKey)).isEqualTo(shuffleDescriptorGroup);
 
-        cache.clearCacheForJob(jobId);
-        assertThat(cache.get(blobKey)).isNull();
+        cache.clearCacheForGroup(jobId);
+        assertThat(cache.get(jobId, blobKey)).isNull();
     }
 
     @Test
     void testPutWhenOverLimit() {
-        DefaultShuffleDescriptorsCache cache =
-                new DefaultShuffleDescriptorsCache.Factory(expireTimeout, 1, 
Ticker.systemTicker())
+        DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
cache =
+                new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
ShuffleDescriptorGroup>(
+                                expireTimeout, 1, Ticker.systemTicker())
                         .create();
 
         JobID jobId = new JobID();
@@ -104,7 +105,7 @@ class DefaultShuffleDescriptorsCacheTest {
         PermanentBlobKey blobKey = new PermanentBlobKey();
 
         cache.put(jobId, blobKey, shuffleDescriptorGroup);
-        assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup);
+        assertThat(cache.get(jobId, 
blobKey)).isEqualTo(shuffleDescriptorGroup);
 
         ShuffleDescriptorGroup otherShuffleDescriptorGroup =
                 new ShuffleDescriptorGroup(
@@ -115,15 +116,15 @@ class DefaultShuffleDescriptorsCacheTest {
         PermanentBlobKey otherBlobKey = new PermanentBlobKey();
 
         cache.put(jobId, otherBlobKey, otherShuffleDescriptorGroup);
-        assertThat(cache.get(blobKey)).isNull();
-        
assertThat(cache.get(otherBlobKey)).isEqualTo(otherShuffleDescriptorGroup);
+        assertThat(cache.get(jobId, blobKey)).isNull();
+        assertThat(cache.get(jobId, 
otherBlobKey)).isEqualTo(otherShuffleDescriptorGroup);
     }
 
     @Test
     void testEntryExpired() {
         TestingTicker ticker = new TestingTicker();
-        DefaultShuffleDescriptorsCache cache =
-                new DefaultShuffleDescriptorsCache.Factory(
+        DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
cache =
+                new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
ShuffleDescriptorGroup>(
                                 Duration.ofSeconds(1), Integer.MAX_VALUE, 
ticker)
                         .create();
 
@@ -138,10 +139,10 @@ class DefaultShuffleDescriptorsCacheTest {
         PermanentBlobKey blobKey = new PermanentBlobKey();
 
         cache.put(jobId, blobKey, shuffleDescriptorGroup);
-        assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup);
+        assertThat(cache.get(jobId, 
blobKey)).isEqualTo(shuffleDescriptorGroup);
 
         ticker.advance(Duration.ofSeconds(2));
-        assertThat(cache.get(blobKey)).isNull();
+        assertThat(cache.get(jobId, blobKey)).isNull();
     }
 
     private static class TestingTicker extends Ticker {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java
 b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java
similarity index 56%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java
index 5c7ad4236e3..93a0012bdd3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java
@@ -16,29 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskexecutor;
+package org.apache.flink.runtime.util;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
-
-/** Non op implement of {@link ShuffleDescriptorsCache}. */
-public class NoOpShuffleDescriptorsCache implements ShuffleDescriptorsCache {
-
-    public static final NoOpShuffleDescriptorsCache INSTANCE = new 
NoOpShuffleDescriptorsCache();
+/** Non op implement of {@link GroupCache}. */
+public class NoOpGroupCache<G, K, V> implements GroupCache<G, K, V> {
 
     @Override
     public void clear() {}
 
     @Override
-    public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) {
+    public V get(G group, K key) {
         return null;
     }
 
     @Override
-    public void put(
-            JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup 
shuffleDescriptorGroup) {}
+    public void put(G group, K key, V value) {}
 
     @Override
-    public void clearCacheForJob(JobID jobId) {}
+    public void clearCacheForGroup(G group) {}
 }


Reply via email to