Thesharing commented on a change in pull request #16498: URL: https://github.com/apache/flink/pull/16498#discussion_r677282028
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobCacheSizeTracker; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobCache; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class DefaultExecutionGraphDeploymentWithSizeLimitTest + extends DefaultExecutionGraphDeploymentWithBlobCacheTest { + + @Before + @Override + public void setupBlobServer() throws IOException { + Configuration config = new Configuration(); + // Always offload the serialized JobInformation, TaskInformation and cached + // ShuffleDescriptors + config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + blobWriter = blobServer; + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); + // Set the size limit of the blob cache to 1 + BlobCacheSizeTracker blobCacheSizeTracker = new BlobCacheSizeTracker(1L); + blobCache = + new PermanentBlobCache( + config, new VoidBlobStore(), serverAddress, blobCacheSizeTracker); + } + + @After + @Override + public void shutdownBlobServer() throws IOException { + if (blobServer != null) { + blobServer.close(); + } + } + + @Test + public void testDeployTasksWithMinimumSizeLimit() throws Exception { + + final int numberOfVertices = 4; + final int parallelism = 10; + + final ExecutionGraph eg = createAndSetupExecutionGraph(numberOfVertices, parallelism); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final BlockingQueue<TaskDeploymentDescriptor> tdds = + new ArrayBlockingQueue<>(numberOfVertices * parallelism); + taskManagerGateway.setSubmitConsumer( + FunctionUtils.uncheckedConsumer( + taskDeploymentDescriptor -> { + taskDeploymentDescriptor.loadBigData(blobCache); + tdds.offer(taskDeploymentDescriptor); + })); + + for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + + assertEquals(ExecutionState.CREATED, ev.getExecutionState()); + + LogicalSlot slot = + new TestingLogicalSlotBuilder() + .setTaskManagerGateway(taskManagerGateway) + .createTestingLogicalSlot(); + ev.getCurrentExecutionAttempt() + .registerProducedPartitions(slot.getTaskManagerLocation(), true) + .get(); + ev.deployToSlot(slot); + assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState()); + + TaskDeploymentDescriptor tdd = tdds.take(); + assertNotNull(tdd); + + List<InputGateDeploymentDescriptor> igdds = tdd.getInputGates(); + assertEquals(ev.getAllConsumedPartitionGroups().size(), igdds.size()); + + if (igdds.size() > 0) { + assertShuffleDescriptorsEqual(igdds.get(0), ev.getConsumedPartitionGroup(0)); + } + } + } + } + + private ExecutionGraph createAndSetupExecutionGraph(int numberOfVertices, int parallelism) + throws JobException, JobExecutionException { + + final List<JobVertex> vertices = new ArrayList<>(); + + for (int i = 0; i < numberOfVertices; i++) { + JobVertex vertex = new JobVertex(String.format("v%d", i + 1), new JobVertexID()); + vertex.setParallelism(parallelism); + vertex.setInvokableClass(BatchTask.class); + vertices.add(vertex); + } + + for (int i = 1; i < numberOfVertices; i++) { + vertices.get(i) + .connectNewDataSetAsInput( + vertices.get(i - 1), + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING); + } + + final JobGraph jobGraph = + JobGraphTestUtils.batchJobGraph(vertices.toArray(new JobVertex[0])); + + final DirectScheduledExecutorService executor = new DirectScheduledExecutorService(); + final DefaultExecutionGraph eg = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .setFutureExecutor(executor) + .setIoExecutor(executor) + .setBlobWriter(blobWriter) + .build(); + + eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + return eg; + } + + private static void assertShuffleDescriptorsEqual( Review comment: Thank you for pointing this out. Resolved. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobCacheSizeTracker; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobCache; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class DefaultExecutionGraphDeploymentWithSizeLimitTest + extends DefaultExecutionGraphDeploymentWithBlobCacheTest { + + @Before + @Override + public void setupBlobServer() throws IOException { + Configuration config = new Configuration(); + // Always offload the serialized JobInformation, TaskInformation and cached + // ShuffleDescriptors + config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + blobWriter = blobServer; + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); + // Set the size limit of the blob cache to 1 + BlobCacheSizeTracker blobCacheSizeTracker = new BlobCacheSizeTracker(1L); + blobCache = + new PermanentBlobCache( + config, new VoidBlobStore(), serverAddress, blobCacheSizeTracker); + } + + @After + @Override + public void shutdownBlobServer() throws IOException { + if (blobServer != null) { + blobServer.close(); + } + } + + @Test + public void testDeployTasksWithMinimumSizeLimit() throws Exception { Review comment: Because in `DefaultExecutionGraphDeploymentTest#testBuildDeploymentDescriptor` it only deploys one task. It cannot test the scenario that the ShuffleDescriptor blob cache is replaced every time a new task is deployed since the size limit is 1. There's another option that we can modify `testBuildDeploymentDescriptor` to deploy all the tasks. WDYT? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.blob.BlobServerPutTest.put; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link BlobCacheSizeTracker}. */ +public class BlobCacheSizeTrackerTest extends TestLogger { + + private static final int blobSize = 10_000; + + private static final Random random = new Random(); + + private static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUp() throws IOException { + temporaryFolder.create(); + } + + @AfterClass + public static void cleanUp() throws IOException { + temporaryFolder.delete(); + } + + @Test + public void testSizeLimitTrackerStatusUpdate() { + BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(5L); + List<JobID> jobIds = new ArrayList<>(); + List<BlobKey> blobKeys = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + jobIds.add(new JobID()); + blobKeys.add(BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB)); + } + for (int i = 0; i < 5; i++) { + tracker.track(jobIds.get(i), blobKeys.get(i), 1); + } + tracker.update(jobIds.get(1), blobKeys.get(1)); + tracker.update(jobIds.get(2), blobKeys.get(2)); + + List<Tuple2<JobID, BlobKey>> blobsToDelete = tracker.checkLimit(2); + + assertThat( + blobsToDelete, + containsInAnyOrder( + Tuple2.of(jobIds.get(0), blobKeys.get(0)), + Tuple2.of(jobIds.get(3), blobKeys.get(3)))); + } + + @Test + public void testTrackPermanentBlobCache() throws Exception { Review comment: Moved to `PermanentBlobCacheSizeLimitTest`, which is a test for the feature. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ########## @@ -247,6 +404,7 @@ public void run() { boolean success = false; try { + blobCacheSizeTracker.unregisterJob(jobId); Review comment: I'm not sure about that. `PermanentBlobCache#releaseJob()` only minus a ref of the `jobRefCounter`. A job is truly released when the ref is zero and the time has passed the `cleanupInterval`. For example, if the TM has only one slot, the blob cache will be removed every time a task is finished. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ########## @@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { return getFileInternal(jobId, key); } + /** + * Returns the content of the file for the BLOB with the provided job ID the blob key. + * + * <p>The method will first attempt to serve the BLOB from the local cache. If the BLOB is not + * in the cache, the method will try to download it from the HA store, or directly from the + * {@link BlobServer}. + * + * <p>Compared to {@code getFile}, {@code readFile} makes sure that the file is fully read in + * the same write lock as the file is accessed. This avoids the scenario that the path is + * returned as the file is deleted concurrently by other threads. + * + * @param jobId ID of the job this blob belongs to + * @param blobKey BLOB key associated with the requested file + * @return The content of the BLOB. + * @throws java.io.FileNotFoundException if the BLOB does not exist; + * @throws IOException if any other error occurs when retrieving the file. + */ + @Override + public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws IOException { + checkNotNull(jobId); + checkNotNull(blobKey); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); + + try { + if (localFile.exists()) { + blobCacheSizeTracker.update(jobId, blobKey); + return FileUtils.readAllBytes(localFile.toPath()); + } + } finally { + readWriteLock.readLock().unlock(); + } + + // first try the distributed blob store (if available) + // use a temporary file (thread-safe without locking) + File incomingFile = createTemporaryFilename(); + try { + try { + if (blobView.get(jobId, blobKey, incomingFile)) { + // now move the temp file to our local cache atomically + readWriteLock.writeLock().lock(); + try { + checkLimitAndMoveFile(incomingFile, jobId, blobKey, localFile, log, null); + return FileUtils.readAllBytes(localFile.toPath()); + } finally { + readWriteLock.writeLock().unlock(); + } + } + } catch (Exception e) { + log.info( + "Failed to copy from blob store. Downloading from BLOB server instead.", e); + } + + final InetSocketAddress currentServerAddress = serverAddress; + + if (currentServerAddress != null) { + // fallback: download from the BlobServer + BlobClient.downloadFromBlobServer( + jobId, + blobKey, + incomingFile, + currentServerAddress, + blobClientConfig, + numFetchRetries); + + readWriteLock.writeLock().lock(); + try { + checkLimitAndMoveFile(incomingFile, jobId, blobKey, localFile, log, null); + return FileUtils.readAllBytes(localFile.toPath()); + } finally { + readWriteLock.writeLock().unlock(); + } + } else { + throw new IOException( + "Cannot download from BlobServer, because the server address is unknown."); + } + + } finally { + // delete incomingFile from a failed download + if (!incomingFile.delete() && incomingFile.exists()) { + log.warn( + "Could not delete the staging file {} for blob key {} and job {}.", + incomingFile, + blobKey, + jobId); + } + } + } + + private void checkLimitAndMoveFile( + File incomingFile, + @Nullable JobID jobId, + BlobKey blobKey, + File localFile, + Logger log, + @Nullable BlobStore blobStore) + throws IOException { + + // Check the size limit and delete the files that exceeds the limit + final long sizeOfIncomingFile = incomingFile.length(); + final List<Tuple2<JobID, BlobKey>> blobsToDelete = + blobCacheSizeTracker.checkLimit(sizeOfIncomingFile); + + for (Tuple2<JobID, BlobKey> key : blobsToDelete) { + deleteFile(key.f0, key.f1); + blobCacheSizeTracker.untrack(key); + } + + // Move the file and register it to the tracker + BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, log, blobStore); + blobCacheSizeTracker.track(jobId, blobKey, localFile.length()); + } + + /** + * Delete the blob file with the given key. + * + * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) + * @param blobKey The key of the desired BLOB. + */ + private void deleteFile(@Nullable JobID jobId, BlobKey blobKey) { Review comment: Done. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first Review comment: Yes, you are right. Since we are not trying to hold WRITE/READ locks in the tracker any more, there will be no deadlocks here. And if `track/untrack/checkLimit` is not guarded by the WRITE/READ lock, there may be concurrent threads trying to check the limit in the middle of the `checkLimits` and `untrack`, making the track inconsistent. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobCacheSizeTracker; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobCache; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class DefaultExecutionGraphDeploymentWithSizeLimitTest Review comment: Thank you for pointing this out. Resolved. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ########## @@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { return getFileInternal(jobId, key); } + /** + * Returns the content of the file for the BLOB with the provided job ID the blob key. + * + * <p>The method will first attempt to serve the BLOB from the local cache. If the BLOB is not + * in the cache, the method will try to download it from the HA store, or directly from the + * {@link BlobServer}. + * + * <p>Compared to {@code getFile}, {@code readFile} makes sure that the file is fully read in + * the same write lock as the file is accessed. This avoids the scenario that the path is + * returned as the file is deleted concurrently by other threads. + * + * @param jobId ID of the job this blob belongs to + * @param blobKey BLOB key associated with the requested file + * @return The content of the BLOB. + * @throws java.io.FileNotFoundException if the BLOB does not exist; + * @throws IOException if any other error occurs when retrieving the file. + */ + @Override + public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws IOException { + checkNotNull(jobId); + checkNotNull(blobKey); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); + + try { + if (localFile.exists()) { + blobCacheSizeTracker.update(jobId, blobKey); + return FileUtils.readAllBytes(localFile.toPath()); + } + } finally { + readWriteLock.readLock().unlock(); + } + + // first try the distributed blob store (if available) + // use a temporary file (thread-safe without locking) + File incomingFile = createTemporaryFilename(); + try { + try { + if (blobView.get(jobId, blobKey, incomingFile)) { + // now move the temp file to our local cache atomically + readWriteLock.writeLock().lock(); + try { + checkLimitAndMoveFile(incomingFile, jobId, blobKey, localFile, log, null); + return FileUtils.readAllBytes(localFile.toPath()); + } finally { + readWriteLock.writeLock().unlock(); + } + } + } catch (Exception e) { + log.info( + "Failed to copy from blob store. Downloading from BLOB server instead.", e); + } + + final InetSocketAddress currentServerAddress = serverAddress; + + if (currentServerAddress != null) { + // fallback: download from the BlobServer + BlobClient.downloadFromBlobServer( + jobId, + blobKey, + incomingFile, + currentServerAddress, + blobClientConfig, + numFetchRetries); + + readWriteLock.writeLock().lock(); + try { + checkLimitAndMoveFile(incomingFile, jobId, blobKey, localFile, log, null); + return FileUtils.readAllBytes(localFile.toPath()); + } finally { + readWriteLock.writeLock().unlock(); + } + } else { + throw new IOException( + "Cannot download from BlobServer, because the server address is unknown."); + } + + } finally { + // delete incomingFile from a failed download + if (!incomingFile.delete() && incomingFile.exists()) { + log.warn( + "Could not delete the staging file {} for blob key {} and job {}.", + incomingFile, + blobKey, + jobId); + } + } + } + + private void checkLimitAndMoveFile( + File incomingFile, + @Nullable JobID jobId, + BlobKey blobKey, + File localFile, + Logger log, + @Nullable BlobStore blobStore) + throws IOException { + + // Check the size limit and delete the files that exceeds the limit + final long sizeOfIncomingFile = incomingFile.length(); + final List<Tuple2<JobID, BlobKey>> blobsToDelete = + blobCacheSizeTracker.checkLimit(sizeOfIncomingFile); + + for (Tuple2<JobID, BlobKey> key : blobsToDelete) { + deleteFile(key.f0, key.f1); + blobCacheSizeTracker.untrack(key); Review comment: Since here we use a for-loop, there will be no infinite loop here. The only risk is that we will try to delete the blob every time a new blob inserts until the deletion is successful. I'll make `deleteFile` return the result of deletion and won't untrack the blob if deletion is failed. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.blob.BlobServerPutTest.put; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link BlobCacheSizeTracker}. */ +public class BlobCacheSizeTrackerTest extends TestLogger { + + private static final int blobSize = 10_000; + + private static final Random random = new Random(); + + private static final TemporaryFolder temporaryFolder = new TemporaryFolder(); Review comment: Thank you for pointing this out. I get to learn about how `@Rule` works. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.blob.BlobServerPutTest.put; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link BlobCacheSizeTracker}. */ +public class BlobCacheSizeTrackerTest extends TestLogger { + + private static final int blobSize = 10_000; + + private static final Random random = new Random(); + + private static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUp() throws IOException { + temporaryFolder.create(); + } + + @AfterClass + public static void cleanUp() throws IOException { + temporaryFolder.delete(); + } + + @Test + public void testSizeLimitTrackerStatusUpdate() { + BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(5L); + List<JobID> jobIds = new ArrayList<>(); + List<BlobKey> blobKeys = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + jobIds.add(new JobID()); + blobKeys.add(BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB)); + } + for (int i = 0; i < 5; i++) { + tracker.track(jobIds.get(i), blobKeys.get(i), 1); + } + tracker.update(jobIds.get(1), blobKeys.get(1)); + tracker.update(jobIds.get(2), blobKeys.get(2)); + + List<Tuple2<JobID, BlobKey>> blobsToDelete = tracker.checkLimit(2); + + assertThat( + blobsToDelete, + containsInAnyOrder( + Tuple2.of(jobIds.get(0), blobKeys.get(0)), + Tuple2.of(jobIds.get(3), blobKeys.get(3)))); + } + + @Test + public void testTrackPermanentBlobCache() throws Exception { + + final BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB; + + final Configuration config = new Configuration(); + config.setString( + BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + // The size limit is the size of 2 blobs + int sizeLimit = 2; + + try (BlobServer server = new BlobServer(config, new VoidBlobStore()); + BlobCacheService cache = + initBlobCacheServiceWithSizeLimit( + config, + new VoidBlobStore(), + new InetSocketAddress("localhost", server.getPort()), + sizeLimit * blobSize)) { + + server.start(); + + // Initialize the data structures + JobID[] jobId = new JobID[4]; + byte[][] data = new byte[4][blobSize]; + BlobKey[] key = new BlobKey[4]; + File[] file = new File[4]; + + // Prepare the blob data + byte[] blobData = new byte[blobSize]; + random.nextBytes(blobData); + + for (int i = 0; i < 4; i++) { + jobId[i] = new JobID(); + data[i] = Arrays.copyOf(blobData, blobData.length); + data[i][i] ^= 1; + } + + // Put the first, the second and the the third BLOB into the blob server one by one + // and let the cache retrieve the data from the blob + for (int i = 0; i < 3; i++) { + key[i] = put(server, jobId[i], data[i], blobType); + assertNotNull(key[i]); + + readFileAndVerifyContent(cache, jobId[i], key[i], data[i]); + file[i] = getFile(cache, jobId[i], key[i]); + assertTrue(file[i].exists()); + } + + // Since the size limit of the cache is the size of 2 blobs, + // the first blob is removed and the second one remains + assertFalse(file[0].exists()); + assertTrue(file[1].exists()); + + // Get the second blob once again, make the third blob to be the last recently used + readFileAndVerifyContent(cache, jobId[1], key[1], data[1]); + + // Then get the fourth blob, make sure the third blob is replaced. + // Here we cannot get the first blob again, because for transient blob, + // BlobServer will delete the blob once it's transmitted. + key[3] = put(server, jobId[3], data[3], blobType); + readFileAndVerifyContent(cache, jobId[3], key[3], data[3]); + file[3] = getFile(cache, jobId[3], key[3]); + + assertTrue(file[1].exists()); + assertTrue(file[3].exists()); + assertFalse(file[2].exists()); + } + } + + @Test + public void testGetPermanentCacheWithSizeLimitConcurrently() throws Exception { Review comment: Moved to `PermanentBlobCacheSizeLimitTest`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first + * and then hold the lock here. + */ +public class BlobCacheSizeTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BlobCacheSizeTracker.class); + + private static final int INITIAL_SIZE = 10_000; + + private final Object lock = new Object(); + + protected final long sizeLimit; + + @GuardedBy("lock") + private long total; + + @GuardedBy("lock") + private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches; + + @GuardedBy("lock") + protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob; + + public BlobCacheSizeTracker(long sizeLimit) { + this.sizeLimit = sizeLimit; + this.total = 0L; + this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true); + this.blobKeyByJob = new HashMap<>(); + } + + /** + * Check the size limit and return the files to delete. + * + * @param size size of the target file intended to put into cache + * @return list of files to delete before saving the target file + */ + public List<Tuple2<JobID, BlobKey>> checkLimit(long size) { + checkArgument(size >= 0); + + synchronized (lock) { + List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>(); + + long current = total; + + for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : caches.entrySet()) { + if (current + size > sizeLimit) { + blobsToDelete.add(entry.getKey()); + current -= entry.getValue(); + } + } + + return blobsToDelete; + } + } + + /** Register the target file to the tracker. */ + public void track(@Nullable JobID jobId, BlobKey blobKey, long size) { + checkNotNull(blobKey); + checkArgument(size >= 0); + + synchronized (lock) { + caches.put(Tuple2.of(jobId, blobKey), size); + if (jobId != null) { + blobKeyByJob.computeIfAbsent(jobId, ignore -> new HashSet<>()).add(blobKey); + } + total += size; + if (total > sizeLimit) { + LOG.warn( + "The overall size of ShuffleDescriptors in PermanentBlobCache exceeds the limit. " + + "Limit: {}, current: {}. The size of next ShuffleDescriptors: {}.", + sizeLimit, + total, + size); + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(Tuple2<JobID, BlobKey> key) { + checkNotNull(key); + + synchronized (lock) { + if (key.f0 != null) { + blobKeyByJob.computeIfAbsent(key.f0, ignore -> new HashSet<>()).remove(key.f1); + } + Long size = caches.remove(key); + if (size != null && size >= 0) { + total -= size; + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(@Nullable JobID jobId, BlobKey blobKey) { + checkNotNull(blobKey); + + untrack(Tuple2.of(jobId, blobKey)); + } + + /** + * Update the last used index for the BLOBs so that the tracker can easily find out the last + * recently used BLOBs. + */ + public void update(@Nullable JobID jobId, BlobKey blobKey) { + checkNotNull(blobKey); + + synchronized (lock) { + caches.get(Tuple2.of(jobId, blobKey)); + } + } + + /** Unregister all the tracked BLOBs related to given job. */ + public void unregisterJob(JobID jobId) { + checkNotNull(jobId); + + synchronized (lock) { + for (BlobKey blobKey : blobKeyByJob.computeIfAbsent(jobId, ignore -> new HashSet<>())) { + untrack(jobId, blobKey); + } + blobKeyByJob.remove(jobId); + } + } + + @VisibleForTesting + Set<BlobKey> getBlobKeysByJobId(JobID jobId) { Review comment: It's not used in production. It's used in `BlobCacheCleanupTest#checkCacheSizeTracker`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first Review comment: The comment has been improved slightly. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java ########## @@ -45,4 +46,21 @@ * @throws IOException if any other error occurs when retrieving the file */ File getFile(JobID jobId, PermanentBlobKey key) throws IOException; + + /** + * Directly read the file associated with the provided job ID and blob key. + * + * <p>Compared to {@code getFile}, {@code readFile} makes sure that the file is fully read when Review comment: Yes. I agree that the description here is not accurate. Maybe we should speak about the concurrency in the descriptions of `readFile` in `PermanentBlobCache` rather than here? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ########## @@ -102,6 +123,9 @@ public PermanentBlobCache( this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; this.cleanupTimer.schedule( new PermanentBlobCleanupTask(), cleanupInterval, cleanupInterval); + + // Configure the size limit tracker Review comment: Agreed. Removed. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first + * and then hold the lock here. + */ +public class BlobCacheSizeTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BlobCacheSizeTracker.class); + + private static final int INITIAL_SIZE = 10_000; + + private final Object lock = new Object(); + + protected final long sizeLimit; + + @GuardedBy("lock") + private long total; + + @GuardedBy("lock") + private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches; + + @GuardedBy("lock") + protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob; + + public BlobCacheSizeTracker(long sizeLimit) { + this.sizeLimit = sizeLimit; + this.total = 0L; + this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true); + this.blobKeyByJob = new HashMap<>(); + } + + /** + * Check the size limit and return the files to delete. + * + * @param size size of the target file intended to put into cache + * @return list of files to delete before saving the target file + */ + public List<Tuple2<JobID, BlobKey>> checkLimit(long size) { + checkArgument(size >= 0); + + synchronized (lock) { + List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>(); + + long current = total; + + for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : caches.entrySet()) { + if (current + size > sizeLimit) { + blobsToDelete.add(entry.getKey()); + current -= entry.getValue(); + } + } + + return blobsToDelete; + } + } + + /** Register the target file to the tracker. */ + public void track(@Nullable JobID jobId, BlobKey blobKey, long size) { Review comment: Sorry for being careless. I've noticed that. Resolved. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSizeLimitTest.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobCacheSizeTracker; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobCache; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class DefaultExecutionGraphDeploymentWithSizeLimitTest + extends DefaultExecutionGraphDeploymentWithBlobCacheTest { + + @Before + @Override + public void setupBlobServer() throws IOException { + Configuration config = new Configuration(); + // Always offload the serialized JobInformation, TaskInformation and cached + // ShuffleDescriptors + config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + blobWriter = blobServer; + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); + // Set the size limit of the blob cache to 1 + BlobCacheSizeTracker blobCacheSizeTracker = new BlobCacheSizeTracker(1L); + blobCache = + new PermanentBlobCache( + config, new VoidBlobStore(), serverAddress, blobCacheSizeTracker); + } + + @After + @Override + public void shutdownBlobServer() throws IOException { Review comment: Yes. Sorry for being careless. I've removed it. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first + * and then hold the lock here. + */ +public class BlobCacheSizeTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BlobCacheSizeTracker.class); + + private static final int INITIAL_SIZE = 10_000; + + private final Object lock = new Object(); + + protected final long sizeLimit; + + @GuardedBy("lock") + private long total; + + @GuardedBy("lock") + private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches; + + @GuardedBy("lock") + protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob; + + public BlobCacheSizeTracker(long sizeLimit) { + this.sizeLimit = sizeLimit; + this.total = 0L; + this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true); + this.blobKeyByJob = new HashMap<>(); + } + + /** + * Check the size limit and return the files to delete. + * + * @param size size of the target file intended to put into cache + * @return list of files to delete before saving the target file + */ + public List<Tuple2<JobID, BlobKey>> checkLimit(long size) { + checkArgument(size >= 0); + + synchronized (lock) { + List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>(); + + long current = total; + + for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : caches.entrySet()) { + if (current + size > sizeLimit) { + blobsToDelete.add(entry.getKey()); + current -= entry.getValue(); + } + } + + return blobsToDelete; + } + } + + /** Register the target file to the tracker. */ + public void track(@Nullable JobID jobId, BlobKey blobKey, long size) { + checkNotNull(blobKey); + checkArgument(size >= 0); + + synchronized (lock) { + caches.put(Tuple2.of(jobId, blobKey), size); + if (jobId != null) { + blobKeyByJob.computeIfAbsent(jobId, ignore -> new HashSet<>()).add(blobKey); + } + total += size; + if (total > sizeLimit) { + LOG.warn( + "The overall size of ShuffleDescriptors in PermanentBlobCache exceeds the limit. " + + "Limit: {}, current: {}. The size of next ShuffleDescriptors: {}.", + sizeLimit, + total, + size); + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(Tuple2<JobID, BlobKey> key) { + checkNotNull(key); + + synchronized (lock) { + if (key.f0 != null) { + blobKeyByJob.computeIfAbsent(key.f0, ignore -> new HashSet<>()).remove(key.f1); + } + Long size = caches.remove(key); + if (size != null && size >= 0) { Review comment: For sanity I add a `checkState` here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ########## @@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { return getFileInternal(jobId, key); } + /** + * Returns the content of the file for the BLOB with the provided job ID the blob key. + * + * <p>The method will first attempt to serve the BLOB from the local cache. If the BLOB is not + * in the cache, the method will try to download it from the HA store, or directly from the + * {@link BlobServer}. + * + * <p>Compared to {@code getFile}, {@code readFile} makes sure that the file is fully read in + * the same write lock as the file is accessed. This avoids the scenario that the path is + * returned as the file is deleted concurrently by other threads. + * + * @param jobId ID of the job this blob belongs to + * @param blobKey BLOB key associated with the requested file + * @return The content of the BLOB. + * @throws java.io.FileNotFoundException if the BLOB does not exist; + * @throws IOException if any other error occurs when retrieving the file. + */ + @Override + public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws IOException { + checkNotNull(jobId); + checkNotNull(blobKey); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); + + try { + if (localFile.exists()) { + blobCacheSizeTracker.update(jobId, blobKey); + return FileUtils.readAllBytes(localFile.toPath()); + } + } finally { + readWriteLock.readLock().unlock(); + } + + // first try the distributed blob store (if available) + // use a temporary file (thread-safe without locking) + File incomingFile = createTemporaryFilename(); + try { + try { + if (blobView.get(jobId, blobKey, incomingFile)) { + // now move the temp file to our local cache atomically + readWriteLock.writeLock().lock(); + try { + checkLimitAndMoveFile(incomingFile, jobId, blobKey, localFile, log, null); + return FileUtils.readAllBytes(localFile.toPath()); + } finally { + readWriteLock.writeLock().unlock(); + } + } + } catch (Exception e) { + log.info( + "Failed to copy from blob store. Downloading from BLOB server instead.", e); + } + + final InetSocketAddress currentServerAddress = serverAddress; + + if (currentServerAddress != null) { + // fallback: download from the BlobServer + BlobClient.downloadFromBlobServer( + jobId, + blobKey, + incomingFile, + currentServerAddress, + blobClientConfig, + numFetchRetries); + + readWriteLock.writeLock().lock(); + try { + checkLimitAndMoveFile(incomingFile, jobId, blobKey, localFile, log, null); + return FileUtils.readAllBytes(localFile.toPath()); + } finally { + readWriteLock.writeLock().unlock(); + } + } else { + throw new IOException( + "Cannot download from BlobServer, because the server address is unknown."); + } + + } finally { + // delete incomingFile from a failed download + if (!incomingFile.delete() && incomingFile.exists()) { + log.warn( + "Could not delete the staging file {} for blob key {} and job {}.", + incomingFile, + blobKey, + jobId); + } + } + } + + private void checkLimitAndMoveFile( + File incomingFile, + @Nullable JobID jobId, Review comment: Done. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.blob.BlobServerPutTest.put; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link BlobCacheSizeTracker}. */ +public class BlobCacheSizeTrackerTest extends TestLogger { + + private static final int blobSize = 10_000; + + private static final Random random = new Random(); + + private static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUp() throws IOException { + temporaryFolder.create(); + } + + @AfterClass + public static void cleanUp() throws IOException { + temporaryFolder.delete(); + } + + @Test + public void testSizeLimitTrackerStatusUpdate() { Review comment: I get it. I'll add more tests for functions of `BlobCacheSizeTracker`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first + * and then hold the lock here. + */ +public class BlobCacheSizeTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BlobCacheSizeTracker.class); + + private static final int INITIAL_SIZE = 10_000; + + private final Object lock = new Object(); + + protected final long sizeLimit; + + @GuardedBy("lock") + private long total; + + @GuardedBy("lock") + private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches; + + @GuardedBy("lock") + protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob; + + public BlobCacheSizeTracker(long sizeLimit) { + this.sizeLimit = sizeLimit; + this.total = 0L; + this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true); + this.blobKeyByJob = new HashMap<>(); + } + + /** + * Check the size limit and return the files to delete. + * + * @param size size of the target file intended to put into cache + * @return list of files to delete before saving the target file + */ + public List<Tuple2<JobID, BlobKey>> checkLimit(long size) { + checkArgument(size >= 0); + + synchronized (lock) { + List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>(); + + long current = total; + + for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : caches.entrySet()) { + if (current + size > sizeLimit) { + blobsToDelete.add(entry.getKey()); + current -= entry.getValue(); + } + } + + return blobsToDelete; + } + } + + /** Register the target file to the tracker. */ + public void track(@Nullable JobID jobId, BlobKey blobKey, long size) { + checkNotNull(blobKey); + checkArgument(size >= 0); + + synchronized (lock) { + caches.put(Tuple2.of(jobId, blobKey), size); + if (jobId != null) { + blobKeyByJob.computeIfAbsent(jobId, ignore -> new HashSet<>()).add(blobKey); + } + total += size; + if (total > sizeLimit) { + LOG.warn( + "The overall size of ShuffleDescriptors in PermanentBlobCache exceeds the limit. " + + "Limit: {}, current: {}. The size of next ShuffleDescriptors: {}.", + sizeLimit, + total, + size); + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(Tuple2<JobID, BlobKey> key) { + checkNotNull(key); + + synchronized (lock) { + if (key.f0 != null) { + blobKeyByJob.computeIfAbsent(key.f0, ignore -> new HashSet<>()).remove(key.f1); + } + Long size = caches.remove(key); + if (size != null && size >= 0) { + total -= size; + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(@Nullable JobID jobId, BlobKey blobKey) { Review comment: Thank you for pointing this out. I shouldn't ignore this. Resolved. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first + * and then hold the lock here. + */ +public class BlobCacheSizeTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BlobCacheSizeTracker.class); + + private static final int INITIAL_SIZE = 10_000; + + private final Object lock = new Object(); + + protected final long sizeLimit; + + @GuardedBy("lock") + private long total; + + @GuardedBy("lock") + private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches; + + @GuardedBy("lock") + protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob; + + public BlobCacheSizeTracker(long sizeLimit) { + this.sizeLimit = sizeLimit; + this.total = 0L; + this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true); + this.blobKeyByJob = new HashMap<>(); + } + + /** + * Check the size limit and return the files to delete. + * + * @param size size of the target file intended to put into cache + * @return list of files to delete before saving the target file + */ + public List<Tuple2<JobID, BlobKey>> checkLimit(long size) { + checkArgument(size >= 0); + + synchronized (lock) { + List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>(); + + long current = total; + + for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : caches.entrySet()) { + if (current + size > sizeLimit) { + blobsToDelete.add(entry.getKey()); + current -= entry.getValue(); + } + } + + return blobsToDelete; + } + } + + /** Register the target file to the tracker. */ + public void track(@Nullable JobID jobId, BlobKey blobKey, long size) { + checkNotNull(blobKey); + checkArgument(size >= 0); + + synchronized (lock) { + caches.put(Tuple2.of(jobId, blobKey), size); + if (jobId != null) { + blobKeyByJob.computeIfAbsent(jobId, ignore -> new HashSet<>()).add(blobKey); + } + total += size; + if (total > sizeLimit) { + LOG.warn( + "The overall size of ShuffleDescriptors in PermanentBlobCache exceeds the limit. " + + "Limit: {}, current: {}. The size of next ShuffleDescriptors: {}.", + sizeLimit, + total, + size); + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(Tuple2<JobID, BlobKey> key) { + checkNotNull(key); + + synchronized (lock) { + if (key.f0 != null) { + blobKeyByJob.computeIfAbsent(key.f0, ignore -> new HashSet<>()).remove(key.f1); + } + Long size = caches.remove(key); + if (size != null && size >= 0) { + total -= size; + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(@Nullable JobID jobId, BlobKey blobKey) { + checkNotNull(blobKey); + + untrack(Tuple2.of(jobId, blobKey)); + } + + /** + * Update the last used index for the BLOBs so that the tracker can easily find out the last + * recently used BLOBs. + */ + public void update(@Nullable JobID jobId, BlobKey blobKey) { + checkNotNull(blobKey); + + synchronized (lock) { + caches.get(Tuple2.of(jobId, blobKey)); + } + } + + /** Unregister all the tracked BLOBs related to given job. */ + public void unregisterJob(JobID jobId) { Review comment: Thank you for providing such a better name. I've thought about it for a long time. Resolved. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ########## @@ -90,6 +96,21 @@ public PermanentBlobCache( @Nullable final InetSocketAddress serverAddress) throws IOException { + this( + blobClientConfig, + blobView, + serverAddress, + new BlobCacheSizeTracker(MemorySize.ofMebiBytes(100).getBytes())); Review comment: Thank you for providing this suggestion. Done. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first + * and then hold the lock here. + */ +public class BlobCacheSizeTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BlobCacheSizeTracker.class); + + private static final int INITIAL_SIZE = 10_000; + + private final Object lock = new Object(); + + protected final long sizeLimit; + + @GuardedBy("lock") + private long total; + + @GuardedBy("lock") + private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches; + + @GuardedBy("lock") + protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob; + + public BlobCacheSizeTracker(long sizeLimit) { + this.sizeLimit = sizeLimit; + this.total = 0L; + this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true); + this.blobKeyByJob = new HashMap<>(); + } + + /** + * Check the size limit and return the files to delete. + * + * @param size size of the target file intended to put into cache + * @return list of files to delete before saving the target file + */ + public List<Tuple2<JobID, BlobKey>> checkLimit(long size) { + checkArgument(size >= 0); + + synchronized (lock) { + List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>(); + + long current = total; + + for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : caches.entrySet()) { + if (current + size > sizeLimit) { + blobsToDelete.add(entry.getKey()); + current -= entry.getValue(); + } + } + + return blobsToDelete; + } + } + + /** Register the target file to the tracker. */ + public void track(@Nullable JobID jobId, BlobKey blobKey, long size) { + checkNotNull(blobKey); + checkArgument(size >= 0); + + synchronized (lock) { + caches.put(Tuple2.of(jobId, blobKey), size); + if (jobId != null) { + blobKeyByJob.computeIfAbsent(jobId, ignore -> new HashSet<>()).add(blobKey); + } + total += size; + if (total > sizeLimit) { + LOG.warn( + "The overall size of ShuffleDescriptors in PermanentBlobCache exceeds the limit. " + + "Limit: {}, current: {}. The size of next ShuffleDescriptors: {}.", + sizeLimit, + total, + size); + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(Tuple2<JobID, BlobKey> key) { + checkNotNull(key); + + synchronized (lock) { + if (key.f0 != null) { + blobKeyByJob.computeIfAbsent(key.f0, ignore -> new HashSet<>()).remove(key.f1); + } + Long size = caches.remove(key); + if (size != null && size >= 0) { + total -= size; + } + } + } + + /** Remove the BLOB from the tracker. */ + public void untrack(@Nullable JobID jobId, BlobKey blobKey) { + checkNotNull(blobKey); + + untrack(Tuple2.of(jobId, blobKey)); + } + + /** + * Update the last used index for the BLOBs so that the tracker can easily find out the last + * recently used BLOBs. + */ + public void update(@Nullable JobID jobId, BlobKey blobKey) { Review comment: Done. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order for the files in the + * cache. When new files are intended to be put into cache, {@code checkLimit} is called to query + * the files should be removed. This tracker maintains a lock to avoid concurrent modification. To + * avoid the deadlock, make sure that hold the READ/WRITE lock in {@link PermanentBlobCache} first + * and then hold the lock here. + */ +public class BlobCacheSizeTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BlobCacheSizeTracker.class); + + private static final int INITIAL_SIZE = 10_000; + + private final Object lock = new Object(); + + protected final long sizeLimit; + + @GuardedBy("lock") + private long total; + + @GuardedBy("lock") + private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches; + + @GuardedBy("lock") + protected final HashMap<JobID, Set<BlobKey>> blobKeyByJob; + + public BlobCacheSizeTracker(long sizeLimit) { + this.sizeLimit = sizeLimit; + this.total = 0L; + this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true); + this.blobKeyByJob = new HashMap<>(); + } + + /** + * Check the size limit and return the files to delete. + * + * @param size size of the target file intended to put into cache + * @return list of files to delete before saving the target file + */ + public List<Tuple2<JobID, BlobKey>> checkLimit(long size) { + checkArgument(size >= 0); + + synchronized (lock) { + List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>(); + + long current = total; + + for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry : caches.entrySet()) { + if (current + size > sizeLimit) { + blobsToDelete.add(entry.getKey()); + current -= entry.getValue(); + } + } + + return blobsToDelete; + } + } + + /** Register the target file to the tracker. */ + public void track(@Nullable JobID jobId, BlobKey blobKey, long size) { + checkNotNull(blobKey); + checkArgument(size >= 0); + + synchronized (lock) { + caches.put(Tuple2.of(jobId, blobKey), size); Review comment: Alright. If duplicated, new comers will be ignored. Warning will be output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
