Thesharing commented on a change in pull request #16498:
URL: https://github.com/apache/flink/pull/16498#discussion_r677282028



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest
+        extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+    @Before
+    @Override
+    public void setupBlobServer() throws IOException {
+        Configuration config = new Configuration();
+        // Always offload the serialized JobInformation, TaskInformation and 
cached
+        // ShuffleDescriptors
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+        blobWriter = blobServer;
+
+        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
+        // Set the size limit of the blob cache to 1
+        BlobCacheSizeTracker blobCacheSizeTracker = new 
BlobCacheSizeTracker(1L);
+        blobCache =
+                new PermanentBlobCache(
+                        config, new VoidBlobStore(), serverAddress, 
blobCacheSizeTracker);
+    }
+
+    @After
+    @Override
+    public void shutdownBlobServer() throws IOException {
+        if (blobServer != null) {
+            blobServer.close();
+        }
+    }
+
+    @Test
+    public void testDeployTasksWithMinimumSizeLimit() throws Exception {
+
+        final int numberOfVertices = 4;
+        final int parallelism = 10;
+
+        final ExecutionGraph eg = 
createAndSetupExecutionGraph(numberOfVertices, parallelism);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final BlockingQueue<TaskDeploymentDescriptor> tdds =
+                new ArrayBlockingQueue<>(numberOfVertices * parallelism);
+        taskManagerGateway.setSubmitConsumer(
+                FunctionUtils.uncheckedConsumer(
+                        taskDeploymentDescriptor -> {
+                            taskDeploymentDescriptor.loadBigData(blobCache);
+                            tdds.offer(taskDeploymentDescriptor);
+                        }));
+
+        for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) {
+            for (ExecutionVertex ev : ejv.getTaskVertices()) {
+
+                assertEquals(ExecutionState.CREATED, ev.getExecutionState());
+
+                LogicalSlot slot =
+                        new TestingLogicalSlotBuilder()
+                                .setTaskManagerGateway(taskManagerGateway)
+                                .createTestingLogicalSlot();
+                ev.getCurrentExecutionAttempt()
+                        
.registerProducedPartitions(slot.getTaskManagerLocation(), true)
+                        .get();
+                ev.deployToSlot(slot);
+                assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState());
+
+                TaskDeploymentDescriptor tdd = tdds.take();
+                assertNotNull(tdd);
+
+                List<InputGateDeploymentDescriptor> igdds = 
tdd.getInputGates();
+                assertEquals(ev.getAllConsumedPartitionGroups().size(), 
igdds.size());
+
+                if (igdds.size() > 0) {
+                    assertShuffleDescriptorsEqual(igdds.get(0), 
ev.getConsumedPartitionGroup(0));
+                }
+            }
+        }
+    }
+
+    private ExecutionGraph createAndSetupExecutionGraph(int numberOfVertices, 
int parallelism)
+            throws JobException, JobExecutionException {
+
+        final List<JobVertex> vertices = new ArrayList<>();
+
+        for (int i = 0; i < numberOfVertices; i++) {
+            JobVertex vertex = new JobVertex(String.format("v%d", i + 1), new 
JobVertexID());
+            vertex.setParallelism(parallelism);
+            vertex.setInvokableClass(BatchTask.class);
+            vertices.add(vertex);
+        }
+
+        for (int i = 1; i < numberOfVertices; i++) {
+            vertices.get(i)
+                    .connectNewDataSetAsInput(
+                            vertices.get(i - 1),
+                            DistributionPattern.ALL_TO_ALL,
+                            ResultPartitionType.BLOCKING);
+        }
+
+        final JobGraph jobGraph =
+                JobGraphTestUtils.batchJobGraph(vertices.toArray(new 
JobVertex[0]));
+
+        final DirectScheduledExecutorService executor = new 
DirectScheduledExecutorService();
+        final DefaultExecutionGraph eg =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setFutureExecutor(executor)
+                        .setIoExecutor(executor)
+                        .setBlobWriter(blobWriter)
+                        .build();
+
+        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return eg;
+    }
+
+    private static void assertShuffleDescriptorsEqual(

Review comment:
       Thank you for pointing this out. Resolved.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest
+        extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+    @Before
+    @Override
+    public void setupBlobServer() throws IOException {
+        Configuration config = new Configuration();
+        // Always offload the serialized JobInformation, TaskInformation and 
cached
+        // ShuffleDescriptors
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+        blobWriter = blobServer;
+
+        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
+        // Set the size limit of the blob cache to 1
+        BlobCacheSizeTracker blobCacheSizeTracker = new 
BlobCacheSizeTracker(1L);
+        blobCache =
+                new PermanentBlobCache(
+                        config, new VoidBlobStore(), serverAddress, 
blobCacheSizeTracker);
+    }
+
+    @After
+    @Override
+    public void shutdownBlobServer() throws IOException {
+        if (blobServer != null) {
+            blobServer.close();
+        }
+    }
+
+    @Test
+    public void testDeployTasksWithMinimumSizeLimit() throws Exception {

Review comment:
       Because in 
`DefaultExecutionGraphDeploymentTest#testBuildDeploymentDescriptor` it only 
deploys one task. It cannot test the scenario that the ShuffleDescriptor blob 
cache is replaced every time a new task is deployed since the size limit is 1. 
There's another option that we can modify `testBuildDeploymentDescriptor` to 
deploy all the tasks. WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BlobCacheSizeTracker}. */
+public class BlobCacheSizeTrackerTest extends TestLogger {
+
+    private static final int blobSize = 10_000;
+
+    private static final Random random = new Random();
+
+    private static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @BeforeClass
+    public static void setUp() throws IOException {
+        temporaryFolder.create();
+    }
+
+    @AfterClass
+    public static void cleanUp() throws IOException {
+        temporaryFolder.delete();
+    }
+
+    @Test
+    public void testSizeLimitTrackerStatusUpdate() {
+        BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(5L);
+        List<JobID> jobIds = new ArrayList<>();
+        List<BlobKey> blobKeys = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            jobIds.add(new JobID());
+            blobKeys.add(BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB));
+        }
+        for (int i = 0; i < 5; i++) {
+            tracker.track(jobIds.get(i), blobKeys.get(i), 1);
+        }
+        tracker.update(jobIds.get(1), blobKeys.get(1));
+        tracker.update(jobIds.get(2), blobKeys.get(2));
+
+        List<Tuple2<JobID, BlobKey>> blobsToDelete = tracker.checkLimit(2);
+
+        assertThat(
+                blobsToDelete,
+                containsInAnyOrder(
+                        Tuple2.of(jobIds.get(0), blobKeys.get(0)),
+                        Tuple2.of(jobIds.get(3), blobKeys.get(3))));
+    }
+
+    @Test
+    public void testTrackPermanentBlobCache() throws Exception {

Review comment:
       Moved to `PermanentBlobCacheSizeLimitTest`, which is a test for the 
feature.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -247,6 +404,7 @@ public void run() {
 
                         boolean success = false;
                         try {
+                            blobCacheSizeTracker.unregisterJob(jobId);

Review comment:
       I'm not sure about that. `PermanentBlobCache#releaseJob()` only minus a 
ref of the `jobRefCounter`. A job is truly released when the ref is zero and 
the time has passed the `cleanupInterval`. For example, if the TM has only one 
slot, the blob cache will be removed every time a task is finished.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) 
throws IOException {
         return getFileInternal(jobId, key);
     }
 
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            @Nullable JobID jobId,
+            BlobKey blobKey,
+            File localFile,
+            Logger log,
+            @Nullable BlobStore blobStore)
+            throws IOException {
+
+        // Check the size limit and delete the files that exceeds the limit
+        final long sizeOfIncomingFile = incomingFile.length();
+        final List<Tuple2<JobID, BlobKey>> blobsToDelete =
+                blobCacheSizeTracker.checkLimit(sizeOfIncomingFile);
+
+        for (Tuple2<JobID, BlobKey> key : blobsToDelete) {
+            deleteFile(key.f0, key.f1);
+            blobCacheSizeTracker.untrack(key);
+        }
+
+        // Move the file and register it to the tracker
+        BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, 
log, blobStore);
+        blobCacheSizeTracker.track(jobId, blobKey, localFile.length());
+    }
+
+    /**
+     * Delete the blob file with the given key.
+     *
+     * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
+     * @param blobKey The key of the desired BLOB.
+     */
+    private void deleteFile(@Nullable JobID jobId, BlobKey blobKey) {

Review comment:
       Done.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first

Review comment:
       Yes, you are right. Since we are not trying to hold WRITE/READ locks in 
the tracker any more, there will be no deadlocks here. And if 
`track/untrack/checkLimit` is not guarded by the WRITE/READ lock, there may be 
concurrent threads trying to check the limit in the middle of the `checkLimits` 
and `untrack`, making the track inconsistent.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest

Review comment:
       Thank you for pointing this out. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) 
throws IOException {
         return getFileInternal(jobId, key);
     }
 
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            @Nullable JobID jobId,
+            BlobKey blobKey,
+            File localFile,
+            Logger log,
+            @Nullable BlobStore blobStore)
+            throws IOException {
+
+        // Check the size limit and delete the files that exceeds the limit
+        final long sizeOfIncomingFile = incomingFile.length();
+        final List<Tuple2<JobID, BlobKey>> blobsToDelete =
+                blobCacheSizeTracker.checkLimit(sizeOfIncomingFile);
+
+        for (Tuple2<JobID, BlobKey> key : blobsToDelete) {
+            deleteFile(key.f0, key.f1);
+            blobCacheSizeTracker.untrack(key);

Review comment:
       Since here we use a for-loop, there will be no infinite loop here. The 
only risk is that we will try to delete the blob every time a new blob inserts 
until the deletion is successful. I'll make `deleteFile` return the result of 
deletion and won't untrack the blob if deletion is failed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BlobCacheSizeTracker}. */
+public class BlobCacheSizeTrackerTest extends TestLogger {
+
+    private static final int blobSize = 10_000;
+
+    private static final Random random = new Random();
+
+    private static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();

Review comment:
       Thank you for pointing this out. I get to learn about how `@Rule` works.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BlobCacheSizeTracker}. */
+public class BlobCacheSizeTrackerTest extends TestLogger {
+
+    private static final int blobSize = 10_000;
+
+    private static final Random random = new Random();
+
+    private static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @BeforeClass
+    public static void setUp() throws IOException {
+        temporaryFolder.create();
+    }
+
+    @AfterClass
+    public static void cleanUp() throws IOException {
+        temporaryFolder.delete();
+    }
+
+    @Test
+    public void testSizeLimitTrackerStatusUpdate() {
+        BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(5L);
+        List<JobID> jobIds = new ArrayList<>();
+        List<BlobKey> blobKeys = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            jobIds.add(new JobID());
+            blobKeys.add(BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB));
+        }
+        for (int i = 0; i < 5; i++) {
+            tracker.track(jobIds.get(i), blobKeys.get(i), 1);
+        }
+        tracker.update(jobIds.get(1), blobKeys.get(1));
+        tracker.update(jobIds.get(2), blobKeys.get(2));
+
+        List<Tuple2<JobID, BlobKey>> blobsToDelete = tracker.checkLimit(2);
+
+        assertThat(
+                blobsToDelete,
+                containsInAnyOrder(
+                        Tuple2.of(jobIds.get(0), blobKeys.get(0)),
+                        Tuple2.of(jobIds.get(3), blobKeys.get(3))));
+    }
+
+    @Test
+    public void testTrackPermanentBlobCache() throws Exception {
+
+        final BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB;
+
+        final Configuration config = new Configuration();
+        config.setString(
+                BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+        // The size limit is the size of 2 blobs
+        int sizeLimit = 2;
+
+        try (BlobServer server = new BlobServer(config, new VoidBlobStore());
+                BlobCacheService cache =
+                        initBlobCacheServiceWithSizeLimit(
+                                config,
+                                new VoidBlobStore(),
+                                new InetSocketAddress("localhost", 
server.getPort()),
+                                sizeLimit * blobSize)) {
+
+            server.start();
+
+            // Initialize the data structures
+            JobID[] jobId = new JobID[4];
+            byte[][] data = new byte[4][blobSize];
+            BlobKey[] key = new BlobKey[4];
+            File[] file = new File[4];
+
+            // Prepare the blob data
+            byte[] blobData = new byte[blobSize];
+            random.nextBytes(blobData);
+
+            for (int i = 0; i < 4; i++) {
+                jobId[i] = new JobID();
+                data[i] = Arrays.copyOf(blobData, blobData.length);
+                data[i][i] ^= 1;
+            }
+
+            // Put the first, the second and the the third BLOB into the blob 
server one by one
+            // and let the cache retrieve the data from the blob
+            for (int i = 0; i < 3; i++) {
+                key[i] = put(server, jobId[i], data[i], blobType);
+                assertNotNull(key[i]);
+
+                readFileAndVerifyContent(cache, jobId[i], key[i], data[i]);
+                file[i] = getFile(cache, jobId[i], key[i]);
+                assertTrue(file[i].exists());
+            }
+
+            // Since the size limit of the cache is the size of 2 blobs,
+            // the first blob is removed and the second one remains
+            assertFalse(file[0].exists());
+            assertTrue(file[1].exists());
+
+            // Get the second blob once again, make the third blob to be the 
last recently used
+            readFileAndVerifyContent(cache, jobId[1], key[1], data[1]);
+
+            // Then get the fourth blob, make sure the third blob is replaced.
+            // Here we cannot get the first blob again, because for transient 
blob,
+            // BlobServer will delete the blob once it's transmitted.
+            key[3] = put(server, jobId[3], data[3], blobType);
+            readFileAndVerifyContent(cache, jobId[3], key[3], data[3]);
+            file[3] = getFile(cache, jobId[3], key[3]);
+
+            assertTrue(file[1].exists());
+            assertTrue(file[3].exists());
+            assertFalse(file[2].exists());
+        }
+    }
+
+    @Test
+    public void testGetPermanentCacheWithSizeLimitConcurrently() throws 
Exception {

Review comment:
       Moved to `PermanentBlobCacheSizeLimitTest`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        untrack(Tuple2.of(jobId, blobKey));
+    }
+
+    /**
+     * Update the last used index for the BLOBs so that the tracker can easily 
find out the last
+     * recently used BLOBs.
+     */
+    public void update(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        synchronized (lock) {
+            caches.get(Tuple2.of(jobId, blobKey));
+        }
+    }
+
+    /** Unregister all the tracked BLOBs related to given job. */
+    public void unregisterJob(JobID jobId) {
+        checkNotNull(jobId);
+
+        synchronized (lock) {
+            for (BlobKey blobKey : blobKeyByJob.computeIfAbsent(jobId, ignore 
-> new HashSet<>())) {
+                untrack(jobId, blobKey);
+            }
+            blobKeyByJob.remove(jobId);
+        }
+    }
+
+    @VisibleForTesting
+    Set<BlobKey> getBlobKeysByJobId(JobID jobId) {

Review comment:
       It's not used in production. It's used in 
`BlobCacheCleanupTest#checkCacheSizeTracker`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first

Review comment:
       The comment has been improved slightly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
##########
@@ -45,4 +46,21 @@
      * @throws IOException if any other error occurs when retrieving the file
      */
     File getFile(JobID jobId, PermanentBlobKey key) throws IOException;
+
+    /**
+     * Directly read the file associated with the provided job ID and blob key.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read when

Review comment:
       Yes. I agree that the description here is not accurate. Maybe we should 
speak about the concurrency in the descriptions of `readFile` in 
`PermanentBlobCache` rather than here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -102,6 +123,9 @@ public PermanentBlobCache(
         this.cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
         this.cleanupTimer.schedule(
                 new PermanentBlobCleanupTask(), cleanupInterval, 
cleanupInterval);
+
+        // Configure the size limit tracker

Review comment:
       Agreed. Removed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {

Review comment:
       Sorry for being careless. I've noticed that. Resolved.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest
+        extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+    @Before
+    @Override
+    public void setupBlobServer() throws IOException {
+        Configuration config = new Configuration();
+        // Always offload the serialized JobInformation, TaskInformation and 
cached
+        // ShuffleDescriptors
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+        blobWriter = blobServer;
+
+        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
+        // Set the size limit of the blob cache to 1
+        BlobCacheSizeTracker blobCacheSizeTracker = new 
BlobCacheSizeTracker(1L);
+        blobCache =
+                new PermanentBlobCache(
+                        config, new VoidBlobStore(), serverAddress, 
blobCacheSizeTracker);
+    }
+
+    @After
+    @Override
+    public void shutdownBlobServer() throws IOException {

Review comment:
       Yes. Sorry for being careless. I've removed it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {

Review comment:
       For sanity I add a `checkState` here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) 
throws IOException {
         return getFileInternal(jobId, key);
     }
 
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            @Nullable JobID jobId,

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BlobCacheSizeTracker}. */
+public class BlobCacheSizeTrackerTest extends TestLogger {
+
+    private static final int blobSize = 10_000;
+
+    private static final Random random = new Random();
+
+    private static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @BeforeClass
+    public static void setUp() throws IOException {
+        temporaryFolder.create();
+    }
+
+    @AfterClass
+    public static void cleanUp() throws IOException {
+        temporaryFolder.delete();
+    }
+
+    @Test
+    public void testSizeLimitTrackerStatusUpdate() {

Review comment:
       I get it. I'll add more tests for functions of `BlobCacheSizeTracker`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {

Review comment:
       Thank you for pointing this out. I shouldn't ignore this. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        untrack(Tuple2.of(jobId, blobKey));
+    }
+
+    /**
+     * Update the last used index for the BLOBs so that the tracker can easily 
find out the last
+     * recently used BLOBs.
+     */
+    public void update(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        synchronized (lock) {
+            caches.get(Tuple2.of(jobId, blobKey));
+        }
+    }
+
+    /** Unregister all the tracked BLOBs related to given job. */
+    public void unregisterJob(JobID jobId) {

Review comment:
       Thank you for providing such a better name. I've thought about it for a 
long time. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -90,6 +96,21 @@ public PermanentBlobCache(
             @Nullable final InetSocketAddress serverAddress)
             throws IOException {
 
+        this(
+                blobClientConfig,
+                blobView,
+                serverAddress,
+                new 
BlobCacheSizeTracker(MemorySize.ofMebiBytes(100).getBytes()));

Review comment:
       Thank you for providing this suggestion. Done.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        untrack(Tuple2.of(jobId, blobKey));
+    }
+
+    /**
+     * Update the last used index for the BLOBs so that the tracker can easily 
find out the last
+     * recently used BLOBs.
+     */
+    public void update(@Nullable JobID jobId, BlobKey blobKey) {

Review comment:
       Done.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);

Review comment:
       Alright. If duplicated, new comers will be ignored. Warning will be 
output.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to