Thesharing commented on a change in pull request #16498: URL: https://github.com/apache/flink/pull/16498#discussion_r678116676
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.blob.BlobServerPutTest.put; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test for using {@link BlobCacheSizeTracker} to track the size of BLOBs in {@link + * PermanentBlobCache}. When new BLOBs are going to be inserted and the size limit exceeds, {@link + * BlobCacheSizeTracker} will provide excess BLOBs for {@link PermanentBlobCache} to delete. + */ +public class PermanentBlobCacheSizeLimitTest { + + private static final int BLOB_SIZE = 10_000; + + private static final Random RANDOM = new Random(); + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testTrackSizeLimitAndDeleteExcess() throws Exception { + + final BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB; + + final Configuration config = new Configuration(); + config.setString( + BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + // The size limit is the size of 2 blobs + int sizeLimit = 2; + + try (BlobServer server = new BlobServer(config, new VoidBlobStore()); + BlobCacheService cache = + initBlobCacheServiceWithSizeLimit( + config, + new VoidBlobStore(), + new InetSocketAddress("localhost", server.getPort()), + sizeLimit * BLOB_SIZE)) { + + server.start(); + + // Initialize the data structures + JobID[] jobId = new JobID[4]; + byte[][] data = new byte[4][BLOB_SIZE]; + BlobKey[] key = new BlobKey[4]; + File[] file = new File[4]; + + // Prepare the blob data + byte[] blobData = new byte[BLOB_SIZE]; + RANDOM.nextBytes(blobData); + + for (int i = 0; i < 4; i++) { + jobId[i] = new JobID(); + data[i] = Arrays.copyOf(blobData, blobData.length); + data[i][i] ^= 1; + } + + // Put the first, the second and the the third BLOB into the blob server one by one + // and let the cache retrieve the data from the blob + for (int i = 0; i < 3; i++) { + key[i] = put(server, jobId[i], data[i], blobType); + assertNotNull(key[i]); + + readFileAndVerifyContent(cache, jobId[i], key[i], data[i]); + file[i] = getFile(cache, jobId[i], key[i]); + assertTrue(file[i].exists()); + } + + // Since the size limit of the cache is the size of 2 blobs, + // the first blob is removed and the second one remains + assertFalse(file[0].exists()); + assertTrue(file[1].exists()); + + // Get the second blob once again, make the third blob to be the last recently used + readFileAndVerifyContent(cache, jobId[1], key[1], data[1]); + + // Then get the fourth blob, make sure the third blob is replaced. + // Here we cannot get the first blob again, because for transient blob, + // BlobServer will delete the blob once it's transmitted. + key[3] = put(server, jobId[3], data[3], blobType); + readFileAndVerifyContent(cache, jobId[3], key[3], data[3]); + file[3] = getFile(cache, jobId[3], key[3]); + + assertTrue(file[1].exists()); + assertTrue(file[3].exists()); + assertFalse(file[2].exists()); + } + } + + @Test + public void testTrackSizeLimitAndDeleteExcessConcurrently() throws Exception { + + final BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB; + + final Configuration config = new Configuration(); + config.setString( + BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + final int concurrency = 3; + final ExecutorService executor = Executors.newFixedThreadPool(concurrency); Review comment: Thank you for pointing this out. I should've notice issues like this. This test has been refactored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
