This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c475d4  [FLINK-23354][blob] Limit the size of ShuffleDescriptors in 
PermanentBlobCache on TaskExecutor
5c475d4 is described below

commit 5c475d41fea3c81557e0d463bed1c94024dd0da5
Author: Thesharing <[email protected]>
AuthorDate: Thu Jul 15 15:24:36 2021 +0800

    [FLINK-23354][blob] Limit the size of ShuffleDescriptors in 
PermanentBlobCache on TaskExecutor
    
    This closes #16498.
---
 .../flink/runtime/blob/BlobCacheSizeTracker.java   | 204 +++++++++++++++++
 .../flink/runtime/blob/PermanentBlobCache.java     | 162 +++++++++++++
 .../flink/runtime/blob/PermanentBlobService.java   |  21 ++
 .../deployment/InputGateDeploymentDescriptor.java  |   5 +-
 .../flink/runtime/blob/BlobCacheCleanupTest.java   |  19 +-
 .../runtime/blob/BlobCacheSizeTrackerTest.java     | 183 +++++++++++++++
 .../blob/PermanentBlobCacheSizeLimitTest.java      | 255 +++++++++++++++++++++
 ...hDeploymentWithSmallBlobCacheSizeLimitTest.java | 194 ++++++++++++++++
 8 files changed, 1036 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
new file mode 100644
index 0000000..0484389
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the inconsistency, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache}
+ * first and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    private final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        checkArgument(sizeLimit > 0);
+
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the BLOBs to delete.
+     *
+     * @param size size of the BLOB intended to put into the cache
+     * @return list of BLOBs to delete before putting into the target BLOB
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the BLOB to the tracker. */
+    public void track(JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+
+                total += size;
+                if (total > sizeLimit) {
+                    LOG.warn(
+                            "The overall size of BLOBs in the cache exceeds "
+                                    + "the limit. Limit = [{}], Current: [{}], 
"
+                                    + "The size of next BLOB: [{}].",
+                            sizeLimit,
+                            total,
+                            size);
+                }
+            } else {
+                LOG.warn(
+                        "Attempt to track a duplicated BLOB. This may indicate 
a duplicate upload "
+                                + "or a hash collision. Ignoring newest 
upload. "
+                                + "JobID = [{}], BlobKey = [{}]",
+                        jobId,
+                        blobKey);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+        checkNotNull(key.f0);
+        checkNotNull(key.f1);
+
+        synchronized (lock) {
+            blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+
+            Long size = caches.remove(key);
+            if (size != null) {
+                checkState(size >= 0);
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    private void untrack(JobID jobId, BlobKey blobKey) {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        untrack(Tuple2.of(jobId, blobKey));
+    }
+
+    /**
+     * Update the least used index for the BLOBs so that the tracker can 
easily find out the least
+     * recently used BLOBs.
+     */
+    public void update(JobID jobId, BlobKey blobKey) {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        synchronized (lock) {
+            caches.get(Tuple2.of(jobId, blobKey));
+        }
+    }
+
+    /** Unregister all the tracked BLOBs related to the given job. */
+    public void untrackAll(JobID jobId) {
+        checkNotNull(jobId);
+
+        synchronized (lock) {
+            Set<BlobKey> keysToRemove = blobKeyByJob.remove(jobId);
+            if (keysToRemove != null) {
+                for (BlobKey key : keysToRemove) {
+                    untrack(jobId, key);
+                }
+            }
+        }
+    }
+
+    @VisibleForTesting
+    Long getSize(JobID jobId, BlobKey blobKey) {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        synchronized (lock) {
+            return caches.get(Tuple2.of(jobId, blobKey));
+        }
+    }
+
+    @VisibleForTesting
+    Set<BlobKey> getBlobKeysByJobId(JobID jobId) {
+        checkNotNull(jobId);
+
+        synchronized (lock) {
+            return blobKeyByJob.getOrDefault(jobId, Collections.emptySet());
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
index 0f64667..f2238ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.util.FileUtils;
 
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
@@ -33,6 +36,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -65,6 +69,8 @@ public class PermanentBlobCache extends AbstractBlobCache 
implements PermanentBl
         public long keepUntil = -1;
     }
 
+    private static final int DEFAULT_SIZE_LIMIT_MB = 100;
+
     /** Map to store the number of references to a specific job. */
     private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
 
@@ -74,6 +80,8 @@ public class PermanentBlobCache extends AbstractBlobCache 
implements PermanentBl
     /** Timer task to execute the cleanup at regular intervals. */
     private final Timer cleanupTimer;
 
+    private final BlobCacheSizeTracker blobCacheSizeTracker;
+
     /**
      * Instantiates a new cache for permanent BLOBs which are also available 
in an HA store.
      *
@@ -90,6 +98,21 @@ public class PermanentBlobCache extends AbstractBlobCache 
implements PermanentBl
             @Nullable final InetSocketAddress serverAddress)
             throws IOException {
 
+        this(
+                blobClientConfig,
+                blobView,
+                serverAddress,
+                new 
BlobCacheSizeTracker(MemorySize.ofMebiBytes(DEFAULT_SIZE_LIMIT_MB).getBytes()));
+    }
+
+    @VisibleForTesting
+    public PermanentBlobCache(
+            final Configuration blobClientConfig,
+            final BlobView blobView,
+            @Nullable final InetSocketAddress serverAddress,
+            BlobCacheSizeTracker blobCacheSizeTracker)
+            throws IOException {
+
         super(
                 blobClientConfig,
                 blobView,
@@ -102,6 +125,8 @@ public class PermanentBlobCache extends AbstractBlobCache 
implements PermanentBl
         this.cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
         this.cleanupTimer.schedule(
                 new PermanentBlobCleanupTask(), cleanupInterval, 
cleanupInterval);
+
+        this.blobCacheSizeTracker = blobCacheSizeTracker;
     }
 
     /**
@@ -188,6 +213,142 @@ public class PermanentBlobCache extends AbstractBlobCache 
implements PermanentBl
     }
 
     /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            JobID jobId,
+            BlobKey blobKey,
+            File localFile,
+            Logger log,
+            @Nullable BlobStore blobStore)
+            throws IOException {
+
+        // Check the size limit and delete the files that exceeds the limit
+        final long sizeOfIncomingFile = incomingFile.length();
+        final List<Tuple2<JobID, BlobKey>> blobsToDelete =
+                blobCacheSizeTracker.checkLimit(sizeOfIncomingFile);
+
+        for (Tuple2<JobID, BlobKey> key : blobsToDelete) {
+            if (deleteFile(key.f0, key.f1)) {
+                blobCacheSizeTracker.untrack(key);
+            }
+        }
+
+        // Move the file and register it to the tracker
+        BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, 
log, blobStore);
+        blobCacheSizeTracker.track(jobId, blobKey, localFile.length());
+    }
+
+    /**
+     * Delete the blob file with the given key.
+     *
+     * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
+     * @param blobKey The key of the desired BLOB.
+     */
+    private boolean deleteFile(JobID jobId, BlobKey blobKey) {
+        final File localFile =
+                new File(
+                        BlobUtils.getStorageLocationPath(
+                                storageDir.getAbsolutePath(), jobId, blobKey));
+        if (!localFile.delete() && localFile.exists()) {
+            log.warn(
+                    "Failed to delete locally cached BLOB {} at {}",
+                    blobKey,
+                    localFile.getAbsolutePath());
+            return false;
+        }
+        return true;
+    }
+
+    /**
      * Returns a file handle to the file associated with the given blob key on 
the blob server.
      *
      * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
@@ -247,6 +408,7 @@ public class PermanentBlobCache extends AbstractBlobCache 
implements PermanentBl
 
                         boolean success = false;
                         try {
+                            blobCacheSizeTracker.untrackAll(jobId);
                             FileUtils.deleteDirectory(localFile);
                             success = true;
                         } catch (Throwable t) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
index 6fe10a3..b12458e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FileUtils;
 
 import java.io.Closeable;
 import java.io.File;
@@ -45,4 +46,24 @@ public interface PermanentBlobService extends Closeable {
      * @throws IOException if any other error occurs when retrieving the file
      */
     File getFile(JobID jobId, PermanentBlobKey key) throws IOException;
+
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} will attempt to read 
the entire file after
+     * retrieving it. If file reading and file retrieving is done in the same 
WRITE lock, it can
+     * avoid the scenario that the path to the file is deleted concurrently by 
other threads when
+     * the file is retrieved but not read yet.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param key BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    default byte[] readFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+        // The default implementation doesn't guarantee that the file won't be 
deleted concurrently
+        // by other threads while reading the contents.
+        return FileUtils.readAllBytes(getFile(jobId, key).toPath());
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 6863219..9b10675 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -31,13 +31,11 @@ import 
org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.CompressedSerializedValue;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nullable;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -128,12 +126,11 @@ public class InputGateDeploymentDescriptor implements 
Serializable {
 
             Preconditions.checkNotNull(blobService);
 
-            final File dataFile = blobService.getFile(jobId, blobKey);
             // NOTE: Do not delete the ShuffleDescriptor BLOBs since it may be 
needed again during
             // recovery. (it is deleted automatically on the BLOB server and 
cache when its
             // partition is no longer available or the job enters a terminal 
state)
             CompressedSerializedValue<ShuffleDescriptor[]> serializedValue =
-                    
CompressedSerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
+                    
CompressedSerializedValue.fromBytes(blobService.readFile(jobId, blobKey));
             serializedInputChannels = new NonOffloaded<>(serializedValue);
 
             Preconditions.checkNotNull(serializedInputChannels);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index d9a5699..c70c991 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -236,7 +237,9 @@ public class BlobCacheCleanupTest extends TestLogger {
             server = new BlobServer(config, new VoidBlobStore());
             server.start();
             InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-            cache = new PermanentBlobCache(config, new VoidBlobStore(), 
serverAddress);
+            final BlobCacheSizeTracker tracker =
+                    new 
BlobCacheSizeTracker(MemorySize.ofMebiBytes(100).getBytes());
+            cache = new PermanentBlobCache(config, new VoidBlobStore(), 
serverAddress, tracker);
 
             // upload blobs
             keys.add(server.putPermanent(jobId, buf));
@@ -245,26 +248,29 @@ public class BlobCacheCleanupTest extends TestLogger {
 
             checkFileCountForJob(2, jobId, server);
             checkFileCountForJob(0, jobId, cache);
+            checkBlobCacheSizeTracker(tracker, jobId, 0);
 
             // register once
             cache.registerJob(jobId);
 
             checkFileCountForJob(2, jobId, server);
             checkFileCountForJob(0, jobId, cache);
+            checkBlobCacheSizeTracker(tracker, jobId, 0);
 
             for (PermanentBlobKey key : keys) {
-                cache.getFile(jobId, key);
+                cache.readFile(jobId, key);
             }
 
             // register again (let's say, from another thread or so)
             cache.registerJob(jobId);
             for (PermanentBlobKey key : keys) {
-                cache.getFile(jobId, key);
+                cache.readFile(jobId, key);
             }
 
             assertEquals(2, checkFilesExist(jobId, keys, cache, true));
             checkFileCountForJob(2, jobId, server);
             checkFileCountForJob(2, jobId, cache);
+            checkBlobCacheSizeTracker(tracker, jobId, 2);
 
             // after releasing once, nothing should change
             cache.releaseJob(jobId);
@@ -272,6 +278,7 @@ public class BlobCacheCleanupTest extends TestLogger {
             assertEquals(2, checkFilesExist(jobId, keys, cache, true));
             checkFileCountForJob(2, jobId, server);
             checkFileCountForJob(2, jobId, cache);
+            checkBlobCacheSizeTracker(tracker, jobId, 2);
 
             // after releasing the second time, the job is up for deferred 
cleanup
             cache.releaseJob(jobId);
@@ -289,6 +296,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 
             // files are up for cleanup now...wait for it:
             verifyJobCleanup(cache, jobId, keys);
+            checkBlobCacheSizeTracker(tracker, jobId, 0);
             // server should be unaffected
             checkFileCountForJob(2, jobId, server);
         } finally {
@@ -447,4 +455,9 @@ public class BlobCacheCleanupTest extends TestLogger {
         // this fails if we exited via a timeout
         checkFileCountForJob(0, jobId, cache);
     }
+
+    private static void checkBlobCacheSizeTracker(
+            BlobCacheSizeTracker tracker, JobID jobId, int expected) {
+        assertEquals(tracker.getBlobKeysByJobId(jobId).size(), expected);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
new file mode 100644
index 0000000..f8d6424
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.blob.BlobKey.BlobType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BlobCacheSizeTracker}. */
+public class BlobCacheSizeTrackerTest extends TestLogger {
+
+    private BlobCacheSizeTracker tracker;
+    private JobID jobId;
+    private BlobKey blobKey;
+
+    @Before
+    public void setup() {
+        tracker = new BlobCacheSizeTracker(5L);
+        jobId = new JobID();
+        blobKey = BlobKey.createKey(BlobType.PERMANENT_BLOB);
+
+        tracker.track(jobId, blobKey, 3L);
+    }
+
+    @Test
+    public void testCheckLimit() {
+        List<Tuple2<JobID, BlobKey>> keys = tracker.checkLimit(3L);
+
+        assertEquals(1, keys.size());
+        assertEquals(jobId, keys.get(0).f0);
+        assertEquals(blobKey, keys.get(0).f1);
+    }
+
+    /** If an empty BLOB is intended to be stored, no BLOBs should be removed. 
*/
+    @Test
+    public void testCheckLimitForEmptyBlob() {
+        List<Tuple2<JobID, BlobKey>> keys = tracker.checkLimit(0L);
+
+        assertEquals(0, keys.size());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCheckLimitForBlobWithNegativeSize() {
+        tracker.checkLimit(-1L);
+    }
+
+    @Test
+    public void testTrack() {
+        assertEquals(3L, (long) tracker.getSize(jobId, blobKey));
+        assertTrue(tracker.getBlobKeysByJobId(jobId).contains(blobKey));
+    }
+
+    /**
+     * When trying to track a duplicated BLOB, the new one will be ignored, 
just like {@link
+     * BlobUtils#moveTempFileToStore} does.
+     */
+    @Test
+    public void testTrackDuplicatedBlob() {
+        tracker.track(jobId, blobKey, 1L);
+        assertEquals(3L, (long) tracker.getSize(jobId, blobKey));
+        assertEquals(1, tracker.getBlobKeysByJobId(jobId).size());
+    }
+
+    @Test
+    public void testUntrack() {
+        assertEquals(1, tracker.checkLimit(3L).size());
+        tracker.untrack(Tuple2.of(jobId, blobKey));
+
+        assertNull(tracker.getSize(jobId, blobKey));
+        assertEquals(0, tracker.getBlobKeysByJobId(jobId).size());
+        assertEquals(0, tracker.checkLimit(3L).size());
+    }
+
+    /** Untracking a non-existing BLOB shouldn't change anything or throw any 
exceptions. */
+    @Test
+    public void testUntrackNonExistingBlob() {
+        tracker.untrack(Tuple2.of(jobId, 
BlobKey.createKey(BlobType.PERMANENT_BLOB)));
+        assertEquals(1, tracker.getBlobKeysByJobId(jobId).size());
+    }
+
+    /**
+     * Since the BlobCacheSizeLimitTracker only works in {@link 
PermanentBlobCache}, the JobID
+     * shouldn't be null.
+     */
+    @Test(expected = NullPointerException.class)
+    public void testUntrackBlobWithNullJobId() {
+        tracker.untrack(Tuple2.of(null, 
BlobKey.createKey(BlobType.PERMANENT_BLOB)));
+    }
+
+    @Test
+    public void testUpdate() {
+        BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(5L);
+        List<JobID> jobIds = new ArrayList<>();
+        List<BlobKey> blobKeys = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            jobIds.add(new JobID());
+            blobKeys.add(BlobKey.createKey(BlobType.PERMANENT_BLOB));
+        }
+        for (int i = 0; i < 5; i++) {
+            tracker.track(jobIds.get(i), blobKeys.get(i), 1);
+        }
+        tracker.update(jobIds.get(1), blobKeys.get(1));
+        tracker.update(jobIds.get(2), blobKeys.get(2));
+
+        List<Tuple2<JobID, BlobKey>> blobsToDelete = tracker.checkLimit(2);
+
+        assertThat(
+                blobsToDelete,
+                containsInAnyOrder(
+                        Tuple2.of(jobIds.get(0), blobKeys.get(0)),
+                        Tuple2.of(jobIds.get(3), blobKeys.get(3))));
+    }
+
+    /**
+     * Updating the status for a non-existing BLOB shouldn't change anything 
or throw any
+     * exceptions.
+     */
+    @Test
+    public void testUpdateNonExistingBlob() {
+        tracker.track(new JobID(), BlobKey.createKey(BlobType.PERMANENT_BLOB), 
2L);
+        assertEquals(1, tracker.checkLimit(3L).size());
+
+        tracker.update(new JobID(), 
BlobKey.createKey(BlobType.PERMANENT_BLOB));
+        assertEquals(1, tracker.checkLimit(3L).size());
+    }
+
+    @Test
+    public void testUntrackAll() {
+        tracker.track(jobId, BlobKey.createKey(BlobType.PERMANENT_BLOB), 1L);
+
+        JobID anotherJobId = new JobID();
+        tracker.track(anotherJobId, 
BlobKey.createKey(BlobType.PERMANENT_BLOB), 1L);
+
+        assertEquals(2, tracker.getBlobKeysByJobId(jobId).size());
+        tracker.untrackAll(jobId);
+
+        assertEquals(0, tracker.getBlobKeysByJobId(jobId).size());
+        assertEquals(1, tracker.getBlobKeysByJobId(anotherJobId).size());
+    }
+
+    /**
+     * Untracking all BLOBs for a non-existing job shouldn't change anything 
or throw any
+     * exceptions.
+     */
+    @Test
+    public void testUntrackAllWithNonExistingJob() {
+        tracker.track(jobId, BlobKey.createKey(BlobType.PERMANENT_BLOB), 1L);
+
+        assertEquals(2, tracker.getBlobKeysByJobId(jobId).size());
+        tracker.untrackAll(new JobID());
+
+        assertEquals(2, tracker.getBlobKeysByJobId(jobId).size());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java
new file mode 100644
index 0000000..a9272f5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for using {@link BlobCacheSizeTracker} to track the size of BLOBs in 
{@link
+ * PermanentBlobCache}. When new BLOBs are intended to be stored and the size 
limit exceeds, {@link
+ * BlobCacheSizeTracker} will provide excess BLOBs for {@link 
PermanentBlobCache} to delete.
+ */
+public class PermanentBlobCacheSizeLimitTest {
+
+    private static final Random RANDOM = new Random();
+
+    private static final BlobKey.BlobType BLOB_TYPE = 
BlobKey.BlobType.PERMANENT_BLOB;
+    private static final int BLOB_SIZE = 10_000;
+    // The size limit is the size of 2 blobs
+    private static final int MAX_NUM_OF_ACCEPTED_BLOBS = 2;
+    private static final int TOTAL_NUM_OF_BLOBS = 3;
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    private Configuration config;
+
+    @Before
+    public void setup() throws IOException {
+        config = new Configuration();
+        config.setString(
+                BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+    }
+
+    @Test
+    public void testTrackSizeLimitAndDeleteExcessSequentially() throws 
Exception {
+
+        try (BlobServer server = new BlobServer(config, new VoidBlobStore());
+                BlobCacheService cache =
+                        initBlobCacheServiceWithSizeLimit(
+                                config, new InetSocketAddress("localhost", 
server.getPort()))) {
+
+            server.start();
+
+            // Put the BLOBs into the blob server
+            final BlobInfo[] blobs = putBlobsIntoBlobServer(server);
+
+            // The cache retrieves the BLOBs from the server sequentially
+            for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+
+                // Retrieve the BLOB from the blob server
+                readFileAndVerifyContent(cache, blobs[i].jobId, 
blobs[i].blobKey, blobs[i].data);
+
+                // Retrieve the location of BLOBs from the blob cache
+                blobs[i].blobFile = getFile(cache, blobs[i].jobId, 
blobs[i].blobKey);
+                assertTrue(blobs[i].blobFile.exists());
+            }
+
+            // Since the size limit of the blob cache is the size of 2 BLOBs,
+            // the first BLOB is removed and the second BLOB remains
+            assertFalse(blobs[0].blobFile.exists());
+            assertTrue(blobs[1].blobFile.exists());
+
+            // Retrieve the second BLOB once again,
+            // make the third BLOB to be the least recently used
+            readFileAndVerifyContent(cache, blobs[1].jobId, blobs[1].blobKey, 
blobs[1].data);
+
+            // Then retrieve the first BLOB again, make sure the third BLOB is 
replaced
+            blobs[0].blobKey = put(server, blobs[0].jobId, blobs[0].data, 
BLOB_TYPE);
+            readFileAndVerifyContent(cache, blobs[0].jobId, blobs[0].blobKey, 
blobs[0].data);
+            blobs[0].blobFile = getFile(cache, blobs[0].jobId, 
blobs[0].blobKey);
+
+            assertTrue(blobs[0].blobFile.exists());
+            assertTrue(blobs[1].blobFile.exists());
+            assertFalse(blobs[2].blobFile.exists());
+        }
+    }
+
+    @Test
+    public void testTrackSizeLimitAndDeleteExcessConcurrently() throws 
Exception {
+
+        final ExecutorService executor = 
Executors.newFixedThreadPool(TOTAL_NUM_OF_BLOBS);
+
+        try (BlobServer server = new BlobServer(config, new VoidBlobStore());
+                BlobCacheService cache =
+                        initBlobCacheServiceWithSizeLimit(
+                                config, new InetSocketAddress("localhost", 
server.getPort()))) {
+
+            server.start();
+
+            // Put the BLOBs into the blob server
+            final BlobInfo[] blobs = putBlobsIntoBlobServer(server);
+
+            final List<CompletableFuture<Void>> futures = new 
ArrayList<>(TOTAL_NUM_OF_BLOBS);
+
+            // The blob cache retrieves the BLOB from the server concurrently
+            for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+                int idx = i;
+                CompletableFuture<Void> future =
+                        CompletableFuture.supplyAsync(
+                                () -> {
+                                    try {
+                                        // Retrieve the BLOB from the blob 
server
+                                        readFileAndVerifyContent(
+                                                cache,
+                                                blobs[idx].jobId,
+                                                blobs[idx].blobKey,
+                                                blobs[idx].data);
+
+                                        // Retrieve the location of BLOBs from 
the blob cache
+                                        blobs[idx].blobFile =
+                                                getFile(
+                                                        cache,
+                                                        blobs[idx].jobId,
+                                                        blobs[idx].blobKey);
+
+                                        return null;
+                                    } catch (IOException e) {
+                                        throw new CompletionException(e);
+                                    }
+                                },
+                                executor);
+
+                futures.add(future);
+            }
+
+            final CompletableFuture<Void> conjunctFuture = 
FutureUtils.waitForAll(futures);
+            conjunctFuture.get();
+
+            // Check how many BLOBs exist in the blob cache
+            int exists = 0, nonExists = 0;
+            for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+                if (blobs[i].blobFile.exists()) {
+                    exists++;
+                } else {
+                    nonExists++;
+                }
+            }
+            assertEquals(MAX_NUM_OF_ACCEPTED_BLOBS, exists);
+            assertEquals(TOTAL_NUM_OF_BLOBS - MAX_NUM_OF_ACCEPTED_BLOBS, 
nonExists);
+
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    /**
+     * {@link BlobInfo} contains all the information related to a BLOB (for 
the test purpose only).
+     */
+    private static class BlobInfo {
+        private final JobID jobId;
+        private final byte[] data;
+        private BlobKey blobKey;
+        private File blobFile;
+
+        private BlobInfo() {
+            this.jobId = new JobID();
+
+            this.data = new byte[BLOB_SIZE];
+            RANDOM.nextBytes(this.data);
+        }
+    }
+
+    private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws 
IOException {
+        // Initialize the information of BLOBs
+        BlobInfo[] blobs = new BlobInfo[TOTAL_NUM_OF_BLOBS];
+
+        // Put all the BLOBs into the blob server one by one
+        for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+            blobs[i] = new BlobInfo();
+
+            // Put the BLOB into the blob server
+            blobs[i].blobKey = put(server, blobs[i].jobId, blobs[i].data, 
BLOB_TYPE);
+            assertNotNull(blobs[i].blobKey);
+        }
+
+        return blobs;
+    }
+
+    private static BlobCacheService initBlobCacheServiceWithSizeLimit(
+            Configuration config, @Nullable final InetSocketAddress 
serverAddress)
+            throws IOException {
+
+        final PermanentBlobCache permanentBlobCache =
+                new PermanentBlobCache(
+                        config,
+                        new VoidBlobStore(),
+                        serverAddress,
+                        new BlobCacheSizeTracker(MAX_NUM_OF_ACCEPTED_BLOBS * 
BLOB_SIZE));
+
+        final TransientBlobCache transientBlobCache = new 
TransientBlobCache(config, serverAddress);
+
+        return new BlobCacheService(permanentBlobCache, transientBlobCache);
+    }
+
+    private static void readFileAndVerifyContent(
+            BlobService blobService, JobID jobId, BlobKey blobKey, byte[] 
expected)
+            throws IOException {
+
+        assertNotNull(jobId);
+        assertNotNull(blobKey);
+        assertTrue(blobKey instanceof PermanentBlobKey);
+
+        byte[] target =
+                blobService.getPermanentBlobService().readFile(jobId, 
(PermanentBlobKey) blobKey);
+        assertArrayEquals(expected, target);
+    }
+
+    private static File getFile(BlobCacheService blobCacheService, JobID 
jobId, BlobKey blobKey)
+            throws IOException {
+        return 
blobCacheService.getPermanentBlobService().getStorageLocation(jobId, blobKey);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
new file mode 100644
index 0000000..4a37421
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests {@link ExecutionGraph} deployment when job and task information are 
offloaded into the BLOB
+ * server and accessed from the {@link PermanentBlobCache}. {@link 
PermanentBlobCache} uses {@link
+ * BlobCacheSizeTracker} to track the size of ShuffleDescriptor BLOBs. The 
deployment works well
+ * even the size limit of {@link BlobCacheSizeTracker} in {@link 
PermanentBlobCache} is set to the
+ * minimum value.
+ */
+public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
+        extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+    @Before
+    @Override
+    public void setupBlobServer() throws IOException {
+        Configuration config = new Configuration();
+        // Always offload the serialized JobInformation, TaskInformation and 
cached
+        // ShuffleDescriptors
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+        blobWriter = blobServer;
+
+        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
+        // Set the size limit of the blob cache to 1
+        BlobCacheSizeTracker blobCacheSizeTracker = new 
BlobCacheSizeTracker(1L);
+        blobCache =
+                new PermanentBlobCache(
+                        config, new VoidBlobStore(), serverAddress, 
blobCacheSizeTracker);
+    }
+
+    /**
+     * Test the deployment works well even the size limit of {@link 
BlobCacheSizeTracker} in {@link
+     * PermanentBlobCache} is set to the minimum value.
+     *
+     * <p>In this extreme case, since the size limit is 1, every time a task 
is deployed, all the
+     * existing **tracked** BLOBs on the cache must be untracked and deleted 
before the new BLOB is
+     * stored onto the cache.
+     *
+     * <p>This extreme case covers the situation of the normal case, where the 
size limit is much
+     * larger than 1 and the deletion won't happen so frequently.
+     */
+    @Test
+    public void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws 
Exception {
+
+        final int numberOfVertices = 4;
+        final int parallelism = 10;
+
+        final ExecutionGraph eg = 
createAndSetupExecutionGraph(numberOfVertices, parallelism);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final BlockingQueue<TaskDeploymentDescriptor> tdds =
+                new ArrayBlockingQueue<>(numberOfVertices * parallelism);
+        taskManagerGateway.setSubmitConsumer(
+                FunctionUtils.uncheckedConsumer(
+                        taskDeploymentDescriptor -> {
+                            taskDeploymentDescriptor.loadBigData(blobCache);
+                            tdds.offer(taskDeploymentDescriptor);
+                        }));
+
+        for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) {
+            for (ExecutionVertex ev : ejv.getTaskVertices()) {
+
+                assertEquals(ExecutionState.CREATED, ev.getExecutionState());
+
+                LogicalSlot slot =
+                        new TestingLogicalSlotBuilder()
+                                .setTaskManagerGateway(taskManagerGateway)
+                                .createTestingLogicalSlot();
+                ev.getCurrentExecutionAttempt()
+                        
.registerProducedPartitions(slot.getTaskManagerLocation(), true)
+                        .get();
+                ev.deployToSlot(slot);
+                assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState());
+
+                TaskDeploymentDescriptor tdd = tdds.take();
+                assertNotNull(tdd);
+
+                List<InputGateDeploymentDescriptor> igdds = 
tdd.getInputGates();
+                assertEquals(ev.getAllConsumedPartitionGroups().size(), 
igdds.size());
+
+                if (igdds.size() > 0) {
+                    checkShuffleDescriptors(igdds.get(0), 
ev.getConsumedPartitionGroup(0));
+                }
+            }
+        }
+    }
+
+    private ExecutionGraph createAndSetupExecutionGraph(int numberOfVertices, 
int parallelism)
+            throws JobException, JobExecutionException {
+
+        final List<JobVertex> vertices = new ArrayList<>();
+
+        for (int i = 0; i < numberOfVertices; i++) {
+            JobVertex vertex = new JobVertex(String.format("v%d", i + 1), new 
JobVertexID());
+            vertex.setParallelism(parallelism);
+            vertex.setInvokableClass(BatchTask.class);
+            vertices.add(vertex);
+        }
+
+        for (int i = 1; i < numberOfVertices; i++) {
+            vertices.get(i)
+                    .connectNewDataSetAsInput(
+                            vertices.get(i - 1),
+                            DistributionPattern.POINTWISE,
+                            ResultPartitionType.BLOCKING);
+        }
+
+        final JobGraph jobGraph =
+                JobGraphTestUtils.batchJobGraph(vertices.toArray(new 
JobVertex[0]));
+
+        final DirectScheduledExecutorService executor = new 
DirectScheduledExecutorService();
+        final DefaultExecutionGraph eg =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setFutureExecutor(executor)
+                        .setIoExecutor(executor)
+                        .setBlobWriter(blobWriter)
+                        .build();
+
+        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return eg;
+    }
+
+    private static void checkShuffleDescriptors(
+            InputGateDeploymentDescriptor igdd, ConsumedPartitionGroup 
consumedPartitionGroup) {
+        int idx = 0;
+        for (IntermediateResultPartitionID consumedPartitionId : 
consumedPartitionGroup) {
+            assertEquals(
+                    consumedPartitionId,
+                    
igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId());
+        }
+    }
+}

Reply via email to