zhuzhurk commented on a change in pull request #16498:
URL: https://github.com/apache/flink/pull/16498#discussion_r677246841



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) 
throws IOException {
         return getFileInternal(jobId, key);
     }
 
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            @Nullable JobID jobId,

Review comment:
       `jobId` should be non-null for permanent blob cache

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) 
throws IOException {
         return getFileInternal(jobId, key);
     }
 
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            @Nullable JobID jobId,
+            BlobKey blobKey,
+            File localFile,
+            Logger log,
+            @Nullable BlobStore blobStore)
+            throws IOException {
+
+        // Check the size limit and delete the files that exceeds the limit
+        final long sizeOfIncomingFile = incomingFile.length();
+        final List<Tuple2<JobID, BlobKey>> blobsToDelete =
+                blobCacheSizeTracker.checkLimit(sizeOfIncomingFile);
+
+        for (Tuple2<JobID, BlobKey> key : blobsToDelete) {
+            deleteFile(key.f0, key.f1);
+            blobCacheSizeTracker.untrack(key);
+        }
+
+        // Move the file and register it to the tracker
+        BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, 
log, blobStore);
+        blobCacheSizeTracker.track(jobId, blobKey, localFile.length());
+    }
+
+    /**
+     * Delete the blob file with the given key.
+     *
+     * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
+     * @param blobKey The key of the desired BLOB.
+     */
+    private void deleteFile(@Nullable JobID jobId, BlobKey blobKey) {

Review comment:
       `jobId` should be non-null for permanent blob cache

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        untrack(Tuple2.of(jobId, blobKey));
+    }
+
+    /**
+     * Update the last used index for the BLOBs so that the tracker can easily 
find out the last
+     * recently used BLOBs.
+     */
+    public void update(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        synchronized (lock) {
+            caches.get(Tuple2.of(jobId, blobKey));
+        }
+    }
+
+    /** Unregister all the tracked BLOBs related to given job. */
+    public void unregisterJob(JobID jobId) {

Review comment:
       I prefer the name to be `untrackAll` for easier understanding and also 
because there is no `registerJob()`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {

Review comment:
       `jobId` should be non-null for permanent blob cache

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {

Review comment:
       `jobId` should be non-null for permanent blob cache

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -247,6 +404,7 @@ public void run() {
 
                         boolean success = false;
                         try {
+                            blobCacheSizeTracker.unregisterJob(jobId);

Review comment:
       Can we do it in `PermanentBlobCache#releaseJob()`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);

Review comment:
       `put` -> `putIfAbsent` and increate total only if it is newly added

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {

Review comment:
       The size is never expected to be lesser than 0. We should check it 
instead of filter it.
   It's also Ok to not check it because it was already checked on `track`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {
+        checkNotNull(blobKey);
+
+        untrack(Tuple2.of(jobId, blobKey));
+    }
+
+    /**
+     * Update the last used index for the BLOBs so that the tracker can easily 
find out the last
+     * recently used BLOBs.
+     */
+    public void update(@Nullable JobID jobId, BlobKey blobKey) {

Review comment:
       `jobId` should be non-null for permanent blob cache

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first
+ * and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+    private static final int INITIAL_SIZE = 10_000;
+
+    private final Object lock = new Object();
+
+    protected final long sizeLimit;
+
+    @GuardedBy("lock")
+    private long total;
+
+    @GuardedBy("lock")
+    private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+    @GuardedBy("lock")
+    protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+    public BlobCacheSizeTracker(long sizeLimit) {
+        this.sizeLimit = sizeLimit;
+        this.total = 0L;
+        this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+        this.blobKeyByJob = new HashMap<>();
+    }
+
+    /**
+     * Check the size limit and return the files to delete.
+     *
+     * @param size size of the target file intended to put into cache
+     * @return list of files to delete before saving the target file
+     */
+    public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+            long current = total;
+
+            for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : 
caches.entrySet()) {
+                if (current + size > sizeLimit) {
+                    blobsToDelete.add(entry.getKey());
+                    current -= entry.getValue();
+                }
+            }
+
+            return blobsToDelete;
+        }
+    }
+
+    /** Register the target file to the tracker. */
+    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(blobKey);
+        checkArgument(size >= 0);
+
+        synchronized (lock) {
+            caches.put(Tuple2.of(jobId, blobKey), size);
+            if (jobId != null) {
+                blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
+            }
+            total += size;
+            if (total > sizeLimit) {
+                LOG.warn(
+                        "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds the limit. "
+                                + "Limit: {}, current: {}. The size of next 
ShuffleDescriptors: {}.",
+                        sizeLimit,
+                        total,
+                        size);
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(Tuple2<JobID, BlobKey> key) {
+        checkNotNull(key);
+
+        synchronized (lock) {
+            if (key.f0 != null) {
+                blobKeyByJob.computeIfAbsent(key.f0, ignore -> new 
HashSet<>()).remove(key.f1);
+            }
+            Long size = caches.remove(key);
+            if (size != null && size >= 0) {
+                total -= size;
+            }
+        }
+    }
+
+    /** Remove the BLOB from the tracker. */
+    public void untrack(@Nullable JobID jobId, BlobKey blobKey) {

Review comment:
       It can be private




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to