zhuzhurk commented on a change in pull request #16498:
URL: https://github.com/apache/flink/pull/16498#discussion_r677165668



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
##########
@@ -45,4 +46,21 @@
      * @throws IOException if any other error occurs when retrieving the file
      */
     File getFile(JobID jobId, PermanentBlobKey key) throws IOException;
+
+    /**
+     * Directly read the file associated with the provided job ID and blob key.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read when

Review comment:
       >>> {@code readFile} makes sure that ..
   
   I think it's not very accurate because `readFile` does not guarantee that 
but it provides the possibility to achieve that, if the file reading and file 
retrieving happens in the same write lock.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest
+        extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+    @Before
+    @Override
+    public void setupBlobServer() throws IOException {
+        Configuration config = new Configuration();
+        // Always offload the serialized JobInformation, TaskInformation and 
cached
+        // ShuffleDescriptors
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+        blobWriter = blobServer;
+
+        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
+        // Set the size limit of the blob cache to 1
+        BlobCacheSizeTracker blobCacheSizeTracker = new 
BlobCacheSizeTracker(1L);
+        blobCache =
+                new PermanentBlobCache(
+                        config, new VoidBlobStore(), serverAddress, 
blobCacheSizeTracker);
+    }
+
+    @After
+    @Override
+    public void shutdownBlobServer() throws IOException {

Review comment:
       It's not needed because it is implemented the same by 
`DefaultExecutionGraphDeploymentWithBlobServerTest` already

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest
+        extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+    @Before
+    @Override
+    public void setupBlobServer() throws IOException {
+        Configuration config = new Configuration();
+        // Always offload the serialized JobInformation, TaskInformation and 
cached
+        // ShuffleDescriptors
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+        blobWriter = blobServer;
+
+        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
+        // Set the size limit of the blob cache to 1
+        BlobCacheSizeTracker blobCacheSizeTracker = new 
BlobCacheSizeTracker(1L);
+        blobCache =
+                new PermanentBlobCache(
+                        config, new VoidBlobStore(), serverAddress, 
blobCacheSizeTracker);
+    }
+
+    @After
+    @Override
+    public void shutdownBlobServer() throws IOException {
+        if (blobServer != null) {
+            blobServer.close();
+        }
+    }
+
+    @Test
+    public void testDeployTasksWithMinimumSizeLimit() throws Exception {

Review comment:
       Why do we need to add this test given that we already have 
`DefaultExecutionGraphDeploymentTest#testBuildDeploymentDescriptor`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest
+        extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+    @Before
+    @Override
+    public void setupBlobServer() throws IOException {
+        Configuration config = new Configuration();
+        // Always offload the serialized JobInformation, TaskInformation and 
cached
+        // ShuffleDescriptors
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+        blobWriter = blobServer;
+
+        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
+        // Set the size limit of the blob cache to 1
+        BlobCacheSizeTracker blobCacheSizeTracker = new 
BlobCacheSizeTracker(1L);
+        blobCache =
+                new PermanentBlobCache(
+                        config, new VoidBlobStore(), serverAddress, 
blobCacheSizeTracker);
+    }
+
+    @After
+    @Override
+    public void shutdownBlobServer() throws IOException {
+        if (blobServer != null) {
+            blobServer.close();
+        }
+    }
+
+    @Test
+    public void testDeployTasksWithMinimumSizeLimit() throws Exception {
+
+        final int numberOfVertices = 4;
+        final int parallelism = 10;
+
+        final ExecutionGraph eg = 
createAndSetupExecutionGraph(numberOfVertices, parallelism);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final BlockingQueue<TaskDeploymentDescriptor> tdds =
+                new ArrayBlockingQueue<>(numberOfVertices * parallelism);
+        taskManagerGateway.setSubmitConsumer(
+                FunctionUtils.uncheckedConsumer(
+                        taskDeploymentDescriptor -> {
+                            taskDeploymentDescriptor.loadBigData(blobCache);
+                            tdds.offer(taskDeploymentDescriptor);
+                        }));
+
+        for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) {
+            for (ExecutionVertex ev : ejv.getTaskVertices()) {
+
+                assertEquals(ExecutionState.CREATED, ev.getExecutionState());
+
+                LogicalSlot slot =
+                        new TestingLogicalSlotBuilder()
+                                .setTaskManagerGateway(taskManagerGateway)
+                                .createTestingLogicalSlot();
+                ev.getCurrentExecutionAttempt()
+                        
.registerProducedPartitions(slot.getTaskManagerLocation(), true)
+                        .get();
+                ev.deployToSlot(slot);
+                assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState());
+
+                TaskDeploymentDescriptor tdd = tdds.take();
+                assertNotNull(tdd);
+
+                List<InputGateDeploymentDescriptor> igdds = 
tdd.getInputGates();
+                assertEquals(ev.getAllConsumedPartitionGroups().size(), 
igdds.size());
+
+                if (igdds.size() > 0) {
+                    assertShuffleDescriptorsEqual(igdds.get(0), 
ev.getConsumedPartitionGroup(0));
+                }
+            }
+        }
+    }
+
+    private ExecutionGraph createAndSetupExecutionGraph(int numberOfVertices, 
int parallelism)
+            throws JobException, JobExecutionException {
+
+        final List<JobVertex> vertices = new ArrayList<>();
+
+        for (int i = 0; i < numberOfVertices; i++) {
+            JobVertex vertex = new JobVertex(String.format("v%d", i + 1), new 
JobVertexID());
+            vertex.setParallelism(parallelism);
+            vertex.setInvokableClass(BatchTask.class);
+            vertices.add(vertex);
+        }
+
+        for (int i = 1; i < numberOfVertices; i++) {
+            vertices.get(i)
+                    .connectNewDataSetAsInput(
+                            vertices.get(i - 1),
+                            DistributionPattern.ALL_TO_ALL,
+                            ResultPartitionType.BLOCKING);
+        }
+
+        final JobGraph jobGraph =
+                JobGraphTestUtils.batchJobGraph(vertices.toArray(new 
JobVertex[0]));
+
+        final DirectScheduledExecutorService executor = new 
DirectScheduledExecutorService();
+        final DefaultExecutionGraph eg =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setFutureExecutor(executor)
+                        .setIoExecutor(executor)
+                        .setBlobWriter(blobWriter)
+                        .build();
+
+        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return eg;
+    }
+
+    private static void assertShuffleDescriptorsEqual(

Review comment:
       Looks to me a name `checkShuffleDescriptors` is more accurate.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultExecutionGraphDeploymentWithSizeLimitTest

Review comment:
       `DefaultExecutionGraphDeploymentWithSizeLimitTest` -> 
`DefaultExecutionGraphDeploymentWithBlobSizeLimitTest `

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order 
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit} 
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid 
concurrent modification. To
+ * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link 
PermanentBlobCache} first

Review comment:
       >>> To avoid the deadlock, make sure that hold the READ/WRITE lock in 
{@link PermanentBlobCache} first and then hold the lock here.
   
   I cannot come up with the case that how deadlocks can happen in this case. 
However, I can see that inconsistency can happen if track/untrack/checkLimit 
are not guarded by write/read lock of blob cache.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -102,6 +123,9 @@ public PermanentBlobCache(
         this.cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
         this.cleanupTimer.schedule(
                 new PermanentBlobCleanupTask(), cleanupInterval, 
cleanupInterval);
+
+        // Configure the size limit tracker

Review comment:
       This comment does not add extra info and thus is not needed I think

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) 
throws IOException {
         return getFileInternal(jobId, key);
     }
 
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            @Nullable JobID jobId,
+            BlobKey blobKey,
+            File localFile,
+            Logger log,
+            @Nullable BlobStore blobStore)
+            throws IOException {
+
+        // Check the size limit and delete the files that exceeds the limit
+        final long sizeOfIncomingFile = incomingFile.length();
+        final List<Tuple2<JobID, BlobKey>> blobsToDelete =
+                blobCacheSizeTracker.checkLimit(sizeOfIncomingFile);
+
+        for (Tuple2<JobID, BlobKey> key : blobsToDelete) {
+            deleteFile(key.f0, key.f1);
+            blobCacheSizeTracker.untrack(key);

Review comment:
       We should not `untrack` the blob if the deletion failed.
   But I prefer to not block the new blob adding if this happens.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to