This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 64992471c5dd52fb4269f6de51c15e27c2a45b55 Author: 1996fanrui <[email protected]> AuthorDate: Thu Oct 26 14:38:25 2023 +0800 [FLINK-33354][runtime][refactor] Refactor ShuffleDescriptorsCache into a generic GroupCache --- .../deployment/InputGateDeploymentDescriptor.java | 9 +- .../deployment/TaskDeploymentDescriptor.java | 5 +- .../DefaultShuffleDescriptorsCache.java | 153 ------------------- .../taskexecutor/ShuffleDescriptorsCache.java | 54 ------- .../flink/runtime/taskexecutor/TaskExecutor.java | 8 +- .../runtime/taskexecutor/TaskManagerServices.java | 17 ++- .../flink/runtime/util/DefaultGroupCache.java | 163 +++++++++++++++++++++ .../org/apache/flink/runtime/util/GroupCache.java | 50 +++++++ .../DefaultExecutionGraphDeploymentTest.java | 5 +- ...hDeploymentWithSmallBlobCacheSizeLimitTest.java | 5 +- .../taskexecutor/TaskManagerServicesBuilder.java | 3 +- .../DefaultGroupCacheTest.java} | 45 +++--- .../NoOpGroupCache.java} | 19 +-- 13 files changed, 274 insertions(+), 262 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java index 50ed98ed1dc..333a91e0a73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; -import org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache; +import org.apache.flink.runtime.util.GroupCache; import org.apache.flink.util.CompressedSerializedValue; import org.apache.flink.util.Preconditions; @@ -167,7 +167,7 @@ public class InputGateDeploymentDescriptor implements Serializable { public void tryLoadAndDeserializeShuffleDescriptors( @Nullable PermanentBlobService blobService, JobID jobId, - ShuffleDescriptorsCache shuffleDescriptorsCache) + GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> shuffleDescriptorsCache) throws IOException { if (inputChannels != null) { return; @@ -190,13 +190,14 @@ public class InputGateDeploymentDescriptor implements Serializable { @Nullable PermanentBlobService blobService, JobID jobId, MaybeOffloaded<ShuffleDescriptorGroup> serializedShuffleDescriptors, - ShuffleDescriptorsCache shuffleDescriptorsCache) + GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> shuffleDescriptorsCache) throws IOException, ClassNotFoundException { if (serializedShuffleDescriptors instanceof Offloaded) { PermanentBlobKey blobKey = ((Offloaded<ShuffleDescriptorGroup>) serializedShuffleDescriptors) .serializedValueKey; - ShuffleDescriptorGroup shuffleDescriptorGroup = shuffleDescriptorsCache.get(blobKey); + ShuffleDescriptorGroup shuffleDescriptorGroup = + shuffleDescriptorsCache.get(jobId, blobKey); if (shuffleDescriptorGroup == null) { Preconditions.checkNotNull(blobService); // NOTE: Do not delete the ShuffleDescriptor BLOBs since it may be needed again diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index bd3b770142c..5985801a5f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -23,10 +23,11 @@ import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache; +import org.apache.flink.runtime.util.GroupCache; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -242,7 +243,7 @@ public final class TaskDeploymentDescriptor implements Serializable { */ public void loadBigData( @Nullable PermanentBlobService blobService, - ShuffleDescriptorsCache shuffleDescriptorsCache) + GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> shuffleDescriptorsCache) throws IOException, ClassNotFoundException { // re-integrate offloaded job info from blob diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java deleted file mode 100644 index 99a97f2370b..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; - -import org.apache.flink.shaded.guava31.com.google.common.base.Ticker; -import org.apache.flink.shaded.guava31.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; -import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification; - -import java.time.Duration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be expired after timeout. */ -public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache { - private final Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry> shuffleDescriptorsCache; - private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob; - - private DefaultShuffleDescriptorsCache( - Duration expireTimeout, int cacheSizeLimit, Ticker ticker) { - this.cachedBlobKeysPerJob = new HashMap<>(); - this.shuffleDescriptorsCache = - CacheBuilder.newBuilder() - .concurrencyLevel(1) - .maximumSize(cacheSizeLimit) - .expireAfterAccess(expireTimeout) - .ticker(ticker) - .removalListener(this::onCacheRemoval) - .build(); - } - - @Override - public void clear() { - cachedBlobKeysPerJob.clear(); - shuffleDescriptorsCache.cleanUp(); - } - - @Override - public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) { - ShuffleDescriptorCacheEntry entry = shuffleDescriptorsCache.getIfPresent(blobKey); - return entry == null ? null : entry.getShuffleDescriptorGroup(); - } - - @Override - public void put( - JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup shuffleDescriptorGroup) { - shuffleDescriptorsCache.put( - blobKey, new ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobId)); - cachedBlobKeysPerJob.computeIfAbsent(jobId, ignore -> new HashSet<>()).add(blobKey); - } - - @Override - public void clearCacheForJob(JobID jobId) { - Set<PermanentBlobKey> removed = cachedBlobKeysPerJob.remove(jobId); - if (removed != null) { - shuffleDescriptorsCache.invalidateAll(removed); - } - } - - /** - * Removal listener that remove the index of serializedShuffleDescriptorsPerJob . - * - * @param removalNotification of removed element. - */ - private void onCacheRemoval( - RemovalNotification<PermanentBlobKey, ShuffleDescriptorCacheEntry> - removalNotification) { - PermanentBlobKey blobKey = removalNotification.getKey(); - ShuffleDescriptorCacheEntry entry = removalNotification.getValue(); - if (blobKey != null && entry != null) { - cachedBlobKeysPerJob.computeIfPresent( - entry.getJobId(), - (jobID, permanentBlobKeys) -> { - permanentBlobKeys.remove(blobKey); - if (permanentBlobKeys.isEmpty()) { - return null; - } else { - return permanentBlobKeys; - } - }); - } - } - - private static class ShuffleDescriptorCacheEntry { - private final ShuffleDescriptorGroup shuffleDescriptorGroup; - private final JobID jobId; - - public ShuffleDescriptorCacheEntry( - ShuffleDescriptorGroup shuffleDescriptorGroup, JobID jobId) { - this.shuffleDescriptorGroup = checkNotNull(shuffleDescriptorGroup); - this.jobId = checkNotNull(jobId); - } - - public ShuffleDescriptorGroup getShuffleDescriptorGroup() { - return shuffleDescriptorGroup; - } - - public JobID getJobId() { - return jobId; - } - } - - /** The Factory of {@link DefaultShuffleDescriptorsCache}. */ - public static class Factory { - private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = Duration.ofSeconds(300); - private static final int DEFAULT_CACHE_SIZE_LIMIT = 100; - private static final Ticker DEFAULT_TICKER = Ticker.systemTicker(); - - private final Duration cacheExpireTimeout; - private final int cacheSizeLimit; - private final Ticker ticker; - - public Factory() { - this(DEFAULT_CACHE_EXPIRE_TIMEOUT, DEFAULT_CACHE_SIZE_LIMIT, DEFAULT_TICKER); - } - - @VisibleForTesting - public Factory(Duration cacheExpireTimeout, int cacheSizeLimit, Ticker ticker) { - this.cacheExpireTimeout = cacheExpireTimeout; - this.cacheSizeLimit = cacheSizeLimit; - this.ticker = ticker; - } - - public DefaultShuffleDescriptorsCache create() { - return new DefaultShuffleDescriptorsCache(cacheExpireTimeout, cacheSizeLimit, ticker); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java deleted file mode 100644 index a86e6a67722..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; - -/** Cache of shuffle descriptors in TaskExecutor. */ -public interface ShuffleDescriptorsCache { - - /** clear all cache. */ - void clear(); - - /** - * Get shuffle descriptor group in cache. - * - * @param blobKey identify the shuffle descriptor group - * @return shuffle descriptor group in cache if exists, otherwise null - */ - ShuffleDescriptorGroup get(PermanentBlobKey blobKey); - - /** - * Put shuffle descriptor group to cache. - * - * @param jobId of job - * @param blobKey identify the shuffle descriptor group - * @param shuffleDescriptorGroup shuffle descriptor group to cache - */ - void put(JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup shuffleDescriptorGroup); - - /** - * Clear all cache for the Job. - * - * @param jobId of job - */ - void clearCacheForJob(JobID jobId); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 680514f09a6..7146c2befa9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.management.jmx.JMXService; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.JobPermanentBlobService; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.TaskExecutorBlobService; import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.blob.TransientBlobService; @@ -39,6 +40,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -131,6 +133,7 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.GroupCache; import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.CollectionUtil; @@ -297,7 +300,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final ThreadInfoSampleService threadInfoSampleService; - private final ShuffleDescriptorsCache shuffleDescriptorsCache; + private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> + shuffleDescriptorsCache; public TaskExecutor( RpcService rpcService, @@ -1903,7 +1907,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { changelogStoragesManager.releaseResourcesForJob(jobId); currentSlotOfferPerJob.remove(jobId); channelStateExecutorFactoryManager.releaseResourcesForJob(jobId); - shuffleDescriptorsCache.clearCacheForJob(jobId); + shuffleDescriptorsCache.clearCacheForGroup(jobId); fileMergingManager.releaseMergingSnapshotManagerForJob(jobId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index f4b206421aa..ae78575410e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; import org.apache.flink.runtime.entrypoint.WorkingDirectory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -48,6 +51,8 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.DefaultGroupCache; +import org.apache.flink.runtime.util.GroupCache; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -90,7 +95,8 @@ public class TaskManagerServices { private final LibraryCacheManager libraryCacheManager; private final SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService; private final SharedResources sharedResources; - private final ShuffleDescriptorsCache shuffleDescriptorsCache; + private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> + shuffleDescriptorsCache; TaskManagerServices( UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, @@ -111,7 +117,7 @@ public class TaskManagerServices { LibraryCacheManager libraryCacheManager, SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService, SharedResources sharedResources, - ShuffleDescriptorsCache shuffleDescriptorsCache) { + GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> shuffleDescriptorsCache) { this.unresolvedTaskManagerLocation = Preconditions.checkNotNull(unresolvedTaskManagerLocation); @@ -208,7 +214,7 @@ public class TaskManagerServices { return sharedResources; } - public ShuffleDescriptorsCache getShuffleDescriptorCache() { + public GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> getShuffleDescriptorCache() { return shuffleDescriptorsCache; } @@ -409,8 +415,9 @@ public class TaskManagerServices { NoOpSlotAllocationSnapshotPersistenceService.INSTANCE; } - final ShuffleDescriptorsCache shuffleDescriptorsCache = - new DefaultShuffleDescriptorsCache.Factory().create(); + final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> shuffleDescriptorsCache = + new DefaultGroupCache.Factory<JobID, PermanentBlobKey, ShuffleDescriptorGroup>() + .create(); return new TaskManagerServices( unresolvedTaskManagerLocation, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java new file mode 100644 index 00000000000..e424e0521b6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.shaded.guava31.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava31.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** Default implement of {@link GroupCache}. Entries will be expired after timeout. */ +@NotThreadSafe +public class DefaultGroupCache<G, K, V> implements GroupCache<G, K, V> { + private final Cache<CacheKey<G, K>, V> cache; + private final Map<G, Set<CacheKey<G, K>>> cachedBlobKeysPerJob; + + private DefaultGroupCache(Duration expireTimeout, int cacheSizeLimit, Ticker ticker) { + this.cachedBlobKeysPerJob = new HashMap<>(); + this.cache = + CacheBuilder.newBuilder() + .concurrencyLevel(1) + .maximumSize(cacheSizeLimit) + .expireAfterAccess(expireTimeout) + .ticker(ticker) + .removalListener(this::onCacheRemoval) + .build(); + } + + @Override + public void clear() { + cachedBlobKeysPerJob.clear(); + cache.cleanUp(); + } + + @Override + public V get(G group, K key) { + return cache.getIfPresent(new CacheKey<>(group, key)); + } + + @Override + public void put(G group, K key, V value) { + CacheKey<G, K> cacheKey = new CacheKey<>(group, key); + cache.put(cacheKey, value); + cachedBlobKeysPerJob.computeIfAbsent(group, ignore -> new HashSet<>()).add(cacheKey); + } + + @Override + public void clearCacheForGroup(G group) { + Set<CacheKey<G, K>> removed = cachedBlobKeysPerJob.remove(group); + if (removed != null) { + cache.invalidateAll(removed); + } + } + + /** + * Removal listener that remove the cache key of this group . + * + * @param removalNotification of removed element. + */ + private void onCacheRemoval(RemovalNotification<CacheKey<G, K>, V> removalNotification) { + CacheKey<G, K> cacheKey = removalNotification.getKey(); + V value = removalNotification.getValue(); + if (cacheKey != null && value != null) { + cachedBlobKeysPerJob.computeIfPresent( + cacheKey.getGroup(), + (group, keys) -> { + keys.remove(cacheKey); + if (keys.isEmpty()) { + return null; + } else { + return keys; + } + }); + } + } + + private static class CacheKey<G, K> { + private final G group; + private final K key; + + public CacheKey(G group, K key) { + this.group = group; + this.key = key; + } + + public G getGroup() { + return group; + } + + public K getKey() { + return key; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey<?, ?> cacheKey = (CacheKey<?, ?>) o; + return Objects.equals(group, cacheKey.group) && Objects.equals(key, cacheKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(group, key); + } + } + + /** The Factory of {@link DefaultGroupCache}. */ + public static class Factory<G, K, V> { + private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = Duration.ofSeconds(300); + private static final int DEFAULT_CACHE_SIZE_LIMIT = 100; + private static final Ticker DEFAULT_TICKER = Ticker.systemTicker(); + + private final Duration cacheExpireTimeout; + private final int cacheSizeLimit; + private final Ticker ticker; + + public Factory() { + this(DEFAULT_CACHE_EXPIRE_TIMEOUT, DEFAULT_CACHE_SIZE_LIMIT, DEFAULT_TICKER); + } + + @VisibleForTesting + public Factory(Duration cacheExpireTimeout, int cacheSizeLimit, Ticker ticker) { + this.cacheExpireTimeout = cacheExpireTimeout; + this.cacheSizeLimit = cacheSizeLimit; + this.ticker = ticker; + } + + public DefaultGroupCache<G, K, V> create() { + return new DefaultGroupCache<>(cacheExpireTimeout, cacheSizeLimit, ticker); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java new file mode 100644 index 00000000000..25757837878 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import javax.annotation.Nullable; + +/** + * This {@link GroupCache} can cache group, key and value. The group and key are cache key, each key + * belongs to a certain group. All corresponding keys and values will be cleared if a group is + * cleared. + * + * @param <G> The group. + * @param <K> The key. + * @param <V> The value. + */ +public interface GroupCache<G, K, V> { + + /** clear all cache. */ + void clear(); + + /** + * Get value in cache. + * + * @return value in cache if exists, otherwise null + */ + @Nullable + V get(G group, K key); + + /** Put group, key and value to cache. */ + void put(G group, K key, V value); + + /** Clear all caches of the corresponding group. */ + void clearCacheForGroup(G group); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java index fc2df072303..54d8fc30964 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java @@ -58,7 +58,6 @@ import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; -import org.apache.flink.runtime.taskexecutor.NoOpShuffleDescriptorsCache; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -66,6 +65,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.NoOpGroupCache; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.function.FunctionUtils; @@ -179,8 +179,7 @@ class DefaultExecutionGraphDeploymentTest { taskManagerGateway.setSubmitConsumer( FunctionUtils.uncheckedConsumer( taskDeploymentDescriptor -> { - taskDeploymentDescriptor.loadBigData( - blobCache, NoOpShuffleDescriptorsCache.INSTANCE); + taskDeploymentDescriptor.loadBigData(blobCache, new NoOpGroupCache<>()); tdd.complete(taskDeploymentDescriptor); })); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java index 6fdc9de9ade..1848faa3959 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java @@ -42,8 +42,8 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; -import org.apache.flink.runtime.taskexecutor.NoOpShuffleDescriptorsCache; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.NoOpGroupCache; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.FunctionUtils; @@ -120,8 +120,7 @@ class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest taskManagerGateway.setSubmitConsumer( FunctionUtils.uncheckedConsumer( taskDeploymentDescriptor -> { - taskDeploymentDescriptor.loadBigData( - blobCache, NoOpShuffleDescriptorsCache.INSTANCE); + taskDeploymentDescriptor.loadBigData(blobCache, new NoOpGroupCache<>()); tdds.offer(taskDeploymentDescriptor); })); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java index 2bd18f543a1..193e56e20c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable; import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.NoOpGroupCache; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -200,6 +201,6 @@ public class TaskManagerServicesBuilder { libraryCacheManager, slotAllocationSnapshotPersistenceService, sharedResources, - NoOpShuffleDescriptorsCache.INSTANCE); + new NoOpGroupCache<>()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java similarity index 76% rename from flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java index 84c1acaa4e1..b5a3b717094 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskexecutor; +package org.apache.flink.runtime.util; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.PermanentBlobKey; @@ -35,14 +35,14 @@ import java.util.concurrent.atomic.AtomicLong; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link DefaultShuffleDescriptorsCache}. */ -class DefaultShuffleDescriptorsCacheTest { +/** Tests for {@link DefaultGroupCache}. */ +class DefaultGroupCacheTest { private final Duration expireTimeout = Duration.ofSeconds(10); @Test void testGetEntry() { - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory( + DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> cache = + new DefaultGroupCache.Factory<JobID, PermanentBlobKey, ShuffleDescriptorGroup>( expireTimeout, Integer.MAX_VALUE, Ticker.systemTicker()) .create(); @@ -56,16 +56,16 @@ class DefaultShuffleDescriptorsCacheTest { PermanentBlobKey blobKey = new PermanentBlobKey(); - assertThat(cache.get(blobKey)).isNull(); + assertThat(cache.get(jobId, blobKey)).isNull(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); } @Test void testClearCacheForJob() { - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory( + DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> cache = + new DefaultGroupCache.Factory<JobID, PermanentBlobKey, ShuffleDescriptorGroup>( expireTimeout, Integer.MAX_VALUE, Ticker.systemTicker()) .create(); @@ -78,19 +78,20 @@ class DefaultShuffleDescriptorsCacheTest { }); PermanentBlobKey blobKey = new PermanentBlobKey(); - assertThat(cache.get(blobKey)).isNull(); + assertThat(cache.get(jobId, blobKey)).isNull(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); - cache.clearCacheForJob(jobId); - assertThat(cache.get(blobKey)).isNull(); + cache.clearCacheForGroup(jobId); + assertThat(cache.get(jobId, blobKey)).isNull(); } @Test void testPutWhenOverLimit() { - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory(expireTimeout, 1, Ticker.systemTicker()) + DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> cache = + new DefaultGroupCache.Factory<JobID, PermanentBlobKey, ShuffleDescriptorGroup>( + expireTimeout, 1, Ticker.systemTicker()) .create(); JobID jobId = new JobID(); @@ -104,7 +105,7 @@ class DefaultShuffleDescriptorsCacheTest { PermanentBlobKey blobKey = new PermanentBlobKey(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); ShuffleDescriptorGroup otherShuffleDescriptorGroup = new ShuffleDescriptorGroup( @@ -115,15 +116,15 @@ class DefaultShuffleDescriptorsCacheTest { PermanentBlobKey otherBlobKey = new PermanentBlobKey(); cache.put(jobId, otherBlobKey, otherShuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isNull(); - assertThat(cache.get(otherBlobKey)).isEqualTo(otherShuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isNull(); + assertThat(cache.get(jobId, otherBlobKey)).isEqualTo(otherShuffleDescriptorGroup); } @Test void testEntryExpired() { TestingTicker ticker = new TestingTicker(); - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory( + DefaultGroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> cache = + new DefaultGroupCache.Factory<JobID, PermanentBlobKey, ShuffleDescriptorGroup>( Duration.ofSeconds(1), Integer.MAX_VALUE, ticker) .create(); @@ -138,10 +139,10 @@ class DefaultShuffleDescriptorsCacheTest { PermanentBlobKey blobKey = new PermanentBlobKey(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); ticker.advance(Duration.ofSeconds(2)); - assertThat(cache.get(blobKey)).isNull(); + assertThat(cache.get(jobId, blobKey)).isNull(); } private static class TestingTicker extends Ticker { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java similarity index 56% rename from flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java index 5c7ad4236e3..93a0012bdd3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java @@ -16,29 +16,22 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskexecutor; +package org.apache.flink.runtime.util; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; - -/** Non op implement of {@link ShuffleDescriptorsCache}. */ -public class NoOpShuffleDescriptorsCache implements ShuffleDescriptorsCache { - - public static final NoOpShuffleDescriptorsCache INSTANCE = new NoOpShuffleDescriptorsCache(); +/** Non op implement of {@link GroupCache}. */ +public class NoOpGroupCache<G, K, V> implements GroupCache<G, K, V> { @Override public void clear() {} @Override - public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) { + public V get(G group, K key) { return null; } @Override - public void put( - JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup shuffleDescriptorGroup) {} + public void put(G group, K key, V value) {} @Override - public void clearCacheForJob(JobID jobId) {} + public void clearCacheForGroup(G group) {} }
