wanglijie95 commented on code in PR #22861:
URL: https://github.com/apache/flink/pull/22861#discussion_r1266379918
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -446,18 +445,33 @@ public int getIndex() {
}
}
+ /** A set of shuffle descriptors that will be serialized. */
+ public static class ShuffleDescriptorGroup implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ShuffleDescriptorAndIndex[] shuffleDescriptors;
+
+ public ShuffleDescriptorGroup(ShuffleDescriptorAndIndex[]
shuffleDescriptors) {
+ this.shuffleDescriptors = shuffleDescriptors;
Review Comment:
checkNotNull
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -66,27 +67,31 @@ public CachedShuffleDescriptors(
resultPartitionIdToIndex.put(resultPartitionID, index++);
}
this.toBeSerialized = new ArrayDeque<>(consumedPartitionGroup.size());
- this.serializedShuffleDescriptors = new ArrayList<>();
+ this.serializedShuffleDescriptorGroups = new ArrayList<>();
for (ShuffleDescriptorAndIndex shuffleDescriptor : shuffleDescriptors)
{
toBeSerialized.offer(shuffleDescriptor);
}
}
- public List<MaybeOffloaded<ShuffleDescriptorAndIndex[]>>
getAllSerializedShuffleDescriptors() {
+ public List<MaybeOffloaded<ShuffleDescriptorGroup>>
getAllSerializedShuffleDescriptorGroups() {
// the deployment of task is not executed in jobMaster's main thread,
copy this list to
// avoid new element added to the serializedShuffleDescriptors before
TDD is not serialized.
- return new ArrayList<>(serializedShuffleDescriptors);
+ return new ArrayList<>(serializedShuffleDescriptorGroups);
}
public void serializeShuffleDescriptors(
TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer
shuffleDescriptorSerializer)
throws IOException {
if (!toBeSerialized.isEmpty()) {
- MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializedShuffleDescriptor =
+ ShuffleDescriptorGroup shuffleDescriptorGroup =
+ new ShuffleDescriptorGroup(
+ toBeSerialized.toArray(new
ShuffleDescriptorAndIndex[1]));
Review Comment:
Why we change the `toBeSerialized.toArray(new ShuffleDescriptorAndIndex[0])`
to `toBeSerialized.toArray(new ShuffleDescriptorAndIndex[1])` ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+
+/** Cache of shuffle descriptors in TaskExecutor. */
+public interface ShuffleDescriptorsCache {
+
+ /** clear all cache. */
+ void clear();
+
+ /**
+ * Get shuffle descriptors in cache.
+ *
+ * @param blobKey identify the shuffle descriptors
+ * @return shuffle descriptors in cache if exists, otherwise null
+ */
+ ShuffleDescriptorGroup get(PermanentBlobKey blobKey);
+
+ /**
+ * Put shuffle descriptors to cache.
Review Comment:
```suggestion
* Put shuffle descriptor group to cache.
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultShuffleDescriptorsCache}. */
+class DefaultShuffleDescriptorsCacheTest {
Review Comment:
We also need to add a entry-expried case.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -446,18 +445,33 @@ public int getIndex() {
}
}
+ /** A set of shuffle descriptors that will be serialized. */
Review Comment:
```suggestion
/** A set of shuffle descriptors that will be serialized together. */
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java:
##########
@@ -139,58 +142,71 @@ public IndexRange getConsumedSubpartitionIndexRange() {
return consumedSubpartitionIndexRange;
}
- public void loadBigData(@Nullable PermanentBlobService blobService, JobID
jobId)
+ public ShuffleDescriptor[] getShuffleDescriptors() {
+ if (inputChannels == null) {
+ throw new IllegalStateException("InputChannel should not be
null.");
+ }
+ return inputChannels;
+ }
+
+ public void loadBigDataAndDeserializeShuffleDescriptors(
Review Comment:
I suggest changing the name to `tryLoadAndDeserializeShuffleDescriptors`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java:
##########
@@ -68,6 +72,28 @@
public class TaskManagerServices {
private static final Logger LOG =
LoggerFactory.getLogger(TaskManagerServices.class);
+ /**
+ * This is an expert option, that we do not want to expose in the
documentation. The default
+ * value is good enough for almost all cases
+ */
+ @Experimental
+ private static final ConfigOption<Duration>
SHUFFLE_DESCRIPTORS_CACHE_EXPIRE_TIMEOUT =
+
ConfigOptions.key("taskmanager.shuffle-descriptors-cache.expire-timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(300))
+ .withDescription("The expire timeout of shuffle descriptor
caches.");
+
+ /**
+ * This is an expert option, that we do not want to expose in the
documentation. The default
+ * value is good enough for almost all cases
+ */
+ @Experimental
+ private static final ConfigOption<Integer>
SHUFFLE_DESCRIPTORS_CACHE_SIZE_LIMIT =
+
ConfigOptions.key("taskmanager.shuffle-descriptors-cache.size-limit")
+ .intType()
+ .defaultValue(10)
+ .withDescription("The size limit of shuffle descriptor
caches.");
+
Review Comment:
I tend not to introduce configuration options for these two parameters. We
can provide a new ctor `DefaultShuffleDescriptorsCache()`, and using the
default value "300" and "10" in that ctor.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be
expired after timeout. */
+public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache
{
+ Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry>
shuffleDescriptorsCache;
Review Comment:
```suggestion
private final Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry>
shuffleDescriptorsCache;
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be
expired after timeout. */
+public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache
{
+ Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry>
shuffleDescriptorsCache;
+ private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob;
+
+ public DefaultShuffleDescriptorsCache(
+ Duration expireTimeout, int shuffleDescriptorCacheSizeLimit) {
+ this.cachedBlobKeysPerJob = new HashMap<>(4);
+ this.shuffleDescriptorsCache =
+ CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .maximumSize(shuffleDescriptorCacheSizeLimit)
+ .expireAfterAccess(expireTimeout)
+ .removalListener(this::onCacheRemoval)
+ .build();
+ }
+
+ @Override
+ public void clear() {
+ cachedBlobKeysPerJob.clear();
+ shuffleDescriptorsCache.cleanUp();
+ }
+
+ @Override
+ public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) {
+ ShuffleDescriptorCacheEntry entry =
shuffleDescriptorsCache.getIfPresent(blobKey);
+ return entry == null ? null : entry.getShuffleDescriptorGroup();
+ }
+
+ @Override
+ public void put(
+ JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup
shuffleDescriptorGroup) {
+ shuffleDescriptorsCache.put(
+ blobKey, new
ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobId));
+ cachedBlobKeysPerJob.computeIfAbsent(jobId, ignore -> new
HashSet<>(4)).add(blobKey);
+ }
+
+ @Override
+ public void clearCacheForJob(JobID jobId) {
+ Set<PermanentBlobKey> removed = cachedBlobKeysPerJob.remove(jobId);
+ if (removed != null) {
+ shuffleDescriptorsCache.invalidateAll(removed);
+ }
+ }
+
+ /**
+ * Removal listener that remove the index of
serializedShuffleDescriptorsPerJob .
+ *
+ * @param removalNotification of removed element.
+ */
+ private void onCacheRemoval(
+ RemovalNotification<PermanentBlobKey, ShuffleDescriptorCacheEntry>
+ removalNotification) {
+ ShuffleDescriptorCacheEntry entry = removalNotification.getValue();
+ // Delete the index only when the element is evicted, since we've
already cleaned up in the
+ // other scenarios
+ if (removalNotification.wasEvicted() && entry != null) {
+ cachedBlobKeysPerJob.computeIfPresent(
+ entry.getJobId(),
+ (jobID, permanentBlobKeys) -> {
+ permanentBlobKeys.remove(removalNotification.getKey());
+ if (permanentBlobKeys.isEmpty()) {
+ return null;
+ } else {
+ return permanentBlobKeys;
+ }
+ });
+ }
+ }
+
+ private static class ShuffleDescriptorCacheEntry {
+ private final ShuffleDescriptorGroup shuffleDescriptorGroup;
+ private final JobID jobId;
+
+ public ShuffleDescriptorCacheEntry(
+ ShuffleDescriptorGroup shuffleDescriptorGroup, JobID jobId) {
+ this.shuffleDescriptorGroup = shuffleDescriptorGroup;
+ this.jobId = jobId;
Review Comment:
checkNotNull for `shuffleDescriptorGroup` and `jobId`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java:
##########
@@ -139,58 +142,71 @@ public IndexRange getConsumedSubpartitionIndexRange() {
return consumedSubpartitionIndexRange;
}
- public void loadBigData(@Nullable PermanentBlobService blobService, JobID
jobId)
+ public ShuffleDescriptor[] getShuffleDescriptors() {
+ if (inputChannels == null) {
+ throw new IllegalStateException("InputChannel should not be
null.");
+ }
Review Comment:
We can replace the if block with `checkState`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be
expired after timeout. */
+public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache
{
+ Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry>
shuffleDescriptorsCache;
+ private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob;
+
+ public DefaultShuffleDescriptorsCache(
+ Duration expireTimeout, int shuffleDescriptorCacheSizeLimit) {
+ this.cachedBlobKeysPerJob = new HashMap<>(4);
Review Comment:
According to the [Flink code style
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#collections),
it's not recommended to set the initial capacity of a collection unless there
is a good proven reason.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+
+/** Cache of shuffle descriptors in TaskExecutor. */
+public interface ShuffleDescriptorsCache {
+
+ /** clear all cache. */
+ void clear();
+
+ /**
+ * Get shuffle descriptors in cache.
Review Comment:
```suggestion
* Get shuffle descriptor group in cache.
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java:
##########
@@ -139,58 +142,71 @@ public IndexRange getConsumedSubpartitionIndexRange() {
return consumedSubpartitionIndexRange;
}
- public void loadBigData(@Nullable PermanentBlobService blobService, JobID
jobId)
+ public ShuffleDescriptor[] getShuffleDescriptors() {
+ if (inputChannels == null) {
+ throw new IllegalStateException("InputChannel should not be
null.");
+ }
+ return inputChannels;
+ }
+
+ public void loadBigDataAndDeserializeShuffleDescriptors(
+ @Nullable PermanentBlobService blobService,
+ JobID jobId,
+ ShuffleDescriptorsCache shuffleDescriptorsCache)
throws IOException {
- for (int i = 0; i < serializedInputChannels.size(); i++) {
- MaybeOffloaded<ShuffleDescriptorAndIndex[]> shuffleDescriptors =
- serializedInputChannels.get(i);
- if (shuffleDescriptors instanceof Offloaded) {
- PermanentBlobKey blobKey =
- ((Offloaded<ShuffleDescriptorAndIndex[]>)
shuffleDescriptors)
- .serializedValueKey;
+ try {
+ if (inputChannels == null) {
+ inputChannels = new ShuffleDescriptor[numberOfInputChannels];
+ }
- Preconditions.checkNotNull(blobService);
+ for (MaybeOffloaded<ShuffleDescriptorGroup>
serializedShuffleDescriptors :
+ serializedInputChannels) {
+ loadBigDataAndDeserializeShuffleDescriptor(
+ blobService, jobId, serializedShuffleDescriptors,
shuffleDescriptorsCache);
+ }
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not deserialize shuffle
descriptors.", e);
+ }
+ }
+ private void loadBigDataAndDeserializeShuffleDescriptor(
Review Comment:
I suggest changing the name to `tryLoadAndDeserializeShuffleDescriptorGroup`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be
expired after timeout. */
+public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache
{
+ Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry>
shuffleDescriptorsCache;
+ private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob;
+
+ public DefaultShuffleDescriptorsCache(
+ Duration expireTimeout, int shuffleDescriptorCacheSizeLimit) {
+ this.cachedBlobKeysPerJob = new HashMap<>(4);
+ this.shuffleDescriptorsCache =
+ CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .maximumSize(shuffleDescriptorCacheSizeLimit)
+ .expireAfterAccess(expireTimeout)
+ .removalListener(this::onCacheRemoval)
+ .build();
+ }
+
+ @Override
+ public void clear() {
+ cachedBlobKeysPerJob.clear();
+ shuffleDescriptorsCache.cleanUp();
+ }
+
+ @Override
+ public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) {
+ ShuffleDescriptorCacheEntry entry =
shuffleDescriptorsCache.getIfPresent(blobKey);
+ return entry == null ? null : entry.getShuffleDescriptorGroup();
+ }
+
+ @Override
+ public void put(
+ JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup
shuffleDescriptorGroup) {
+ shuffleDescriptorsCache.put(
+ blobKey, new
ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobId));
+ cachedBlobKeysPerJob.computeIfAbsent(jobId, ignore -> new
HashSet<>(4)).add(blobKey);
Review Comment:
ditto
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be
expired after timeout. */
+public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache
{
+ Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry>
shuffleDescriptorsCache;
+ private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob;
+
+ public DefaultShuffleDescriptorsCache(
+ Duration expireTimeout, int shuffleDescriptorCacheSizeLimit) {
+ this.cachedBlobKeysPerJob = new HashMap<>(4);
+ this.shuffleDescriptorsCache =
+ CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .maximumSize(shuffleDescriptorCacheSizeLimit)
+ .expireAfterAccess(expireTimeout)
+ .removalListener(this::onCacheRemoval)
+ .build();
+ }
+
+ @Override
+ public void clear() {
+ cachedBlobKeysPerJob.clear();
+ shuffleDescriptorsCache.cleanUp();
+ }
+
+ @Override
+ public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) {
+ ShuffleDescriptorCacheEntry entry =
shuffleDescriptorsCache.getIfPresent(blobKey);
+ return entry == null ? null : entry.getShuffleDescriptorGroup();
+ }
+
+ @Override
+ public void put(
+ JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup
shuffleDescriptorGroup) {
+ shuffleDescriptorsCache.put(
+ blobKey, new
ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobId));
+ cachedBlobKeysPerJob.computeIfAbsent(jobId, ignore -> new
HashSet<>(4)).add(blobKey);
+ }
+
+ @Override
+ public void clearCacheForJob(JobID jobId) {
+ Set<PermanentBlobKey> removed = cachedBlobKeysPerJob.remove(jobId);
+ if (removed != null) {
+ shuffleDescriptorsCache.invalidateAll(removed);
+ }
+ }
+
+ /**
+ * Removal listener that remove the index of
serializedShuffleDescriptorsPerJob .
+ *
+ * @param removalNotification of removed element.
+ */
+ private void onCacheRemoval(
+ RemovalNotification<PermanentBlobKey, ShuffleDescriptorCacheEntry>
+ removalNotification) {
+ ShuffleDescriptorCacheEntry entry = removalNotification.getValue();
+ // Delete the index only when the element is evicted, since we've
already cleaned up in the
+ // other scenarios
+ if (removalNotification.wasEvicted() && entry != null) {
Review Comment:
Will it affect the correctness if only check `entry != null` here? If it
does not affect correctness, I suggest removing
`removalNotification.wasEvicted()` (and the above comment), which will increase
the difficulty of understanding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]