zhuzhurk commented on a change in pull request #16498: URL: https://github.com/apache/flink/pull/16498#discussion_r677982673
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.blob.BlobKey.BlobType; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link BlobCacheSizeTracker}. */ +public class BlobCacheSizeTrackerTest extends TestLogger { + + private BlobCacheSizeTracker tracker; + private JobID jobId; + private BlobKey blobKey; + + @Before + public void setup() { + tracker = new BlobCacheSizeTracker(5L); + jobId = new JobID(); + blobKey = BlobKey.createKey(BlobType.PERMANENT_BLOB); + + tracker.track(jobId, blobKey, 3L); + } + + @Test + public void testCheckLimit() { + List<Tuple2<JobID, BlobKey>> keys = tracker.checkLimit(3L); + + assertEquals(1, keys.size()); + assertEquals(jobId, keys.get(0).f0); + assertEquals(blobKey, keys.get(0).f1); + } + + @Test + public void testZero() { Review comment: The name is confusing. And I don't think this is expected to happen in production. Maybe it should be `testCheckNonExceededLimit` and check a small size. And I really hope this kind of changes can be examined and polished in ahead. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.blob.BlobKey.BlobType; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link BlobCacheSizeTracker}. */ +public class BlobCacheSizeTrackerTest extends TestLogger { + + private BlobCacheSizeTracker tracker; + private JobID jobId; + private BlobKey blobKey; + + @Before + public void setup() { + tracker = new BlobCacheSizeTracker(5L); + jobId = new JobID(); + blobKey = BlobKey.createKey(BlobType.PERMANENT_BLOB); + + tracker.track(jobId, blobKey, 3L); + } + + @Test + public void testCheckLimit() { + List<Tuple2<JobID, BlobKey>> keys = tracker.checkLimit(3L); + + assertEquals(1, keys.size()); + assertEquals(jobId, keys.get(0).f0); + assertEquals(blobKey, keys.get(0).f1); + } + + @Test + public void testZero() { + List<Tuple2<JobID, BlobKey>> keys = tracker.checkLimit(0L); + + assertEquals(0, keys.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testCheckNegative() { + BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(5L); Review comment: Why do we need to create a new tracker? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.blob.BlobServerPutTest.put; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test for using {@link BlobCacheSizeTracker} to track the size of BLOBs in {@link + * PermanentBlobCache}. When new BLOBs are going to be inserted and the size limit exceeds, {@link + * BlobCacheSizeTracker} will provide excess BLOBs for {@link PermanentBlobCache} to delete. + */ +public class PermanentBlobCacheSizeLimitTest { + + private static final int BLOB_SIZE = 10_000; + + private static final Random RANDOM = new Random(); + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testTrackSizeLimitAndDeleteExcess() throws Exception { + + final BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB; + + final Configuration config = new Configuration(); + config.setString( + BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + // The size limit is the size of 2 blobs + int sizeLimit = 2; + + try (BlobServer server = new BlobServer(config, new VoidBlobStore()); + BlobCacheService cache = + initBlobCacheServiceWithSizeLimit( + config, + new VoidBlobStore(), + new InetSocketAddress("localhost", server.getPort()), + sizeLimit * BLOB_SIZE)) { + + server.start(); + + // Initialize the data structures + JobID[] jobId = new JobID[4]; + byte[][] data = new byte[4][BLOB_SIZE]; + BlobKey[] key = new BlobKey[4]; + File[] file = new File[4]; + + // Prepare the blob data + byte[] blobData = new byte[BLOB_SIZE]; + RANDOM.nextBytes(blobData); + + for (int i = 0; i < 4; i++) { + jobId[i] = new JobID(); + data[i] = Arrays.copyOf(blobData, blobData.length); + data[i][i] ^= 1; + } + + // Put the first, the second and the the third BLOB into the blob server one by one + // and let the cache retrieve the data from the blob + for (int i = 0; i < 3; i++) { + key[i] = put(server, jobId[i], data[i], blobType); + assertNotNull(key[i]); + + readFileAndVerifyContent(cache, jobId[i], key[i], data[i]); + file[i] = getFile(cache, jobId[i], key[i]); + assertTrue(file[i].exists()); + } + + // Since the size limit of the cache is the size of 2 blobs, + // the first blob is removed and the second one remains + assertFalse(file[0].exists()); + assertTrue(file[1].exists()); + + // Get the second blob once again, make the third blob to be the last recently used + readFileAndVerifyContent(cache, jobId[1], key[1], data[1]); + + // Then get the fourth blob, make sure the third blob is replaced. + // Here we cannot get the first blob again, because for transient blob, + // BlobServer will delete the blob once it's transmitted. + key[3] = put(server, jobId[3], data[3], blobType); + readFileAndVerifyContent(cache, jobId[3], key[3], data[3]); + file[3] = getFile(cache, jobId[3], key[3]); + + assertTrue(file[1].exists()); + assertTrue(file[3].exists()); + assertFalse(file[2].exists()); + } + } + + @Test + public void testTrackSizeLimitAndDeleteExcessConcurrently() throws Exception { + + final BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB; + + final Configuration config = new Configuration(); + config.setString( + BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + final int concurrency = 3; + final ExecutorService executor = Executors.newFixedThreadPool(concurrency); + + // The size limit is the size of 2 blobs + final int sizeLimit = 2; + + try (BlobServer server = new BlobServer(config, new VoidBlobStore()); + BlobCacheService cache = + initBlobCacheServiceWithSizeLimit( + config, + new VoidBlobStore(), + new InetSocketAddress("localhost", server.getPort()), + sizeLimit * BLOB_SIZE)) { + + server.start(); + + // Initialize the data structures + JobID[] jobId = new JobID[concurrency]; + BlobKey[] key = new BlobKey[concurrency]; + File[] file = new File[concurrency]; + + byte[] blobData = new byte[BLOB_SIZE]; + RANDOM.nextBytes(blobData); + + // Put all the BLOBs into the blob server one by one + for (int i = 0; i < concurrency; i++) { + jobId[i] = new JobID(); + + key[i] = put(server, jobId[i], blobData, blobType); + assertNotNull(key[i]); + } + + List<CompletableFuture<Void>> futures = new ArrayList<>(concurrency); + + // Get and visit the blob cache concurrently + for (int i = 0; i < concurrency; i++) { + int idx = i; + CompletableFuture<Void> future = + CompletableFuture.supplyAsync( + () -> { + try { + file[idx] = getFile(cache, jobId[idx], key[idx]); + readFileAndVerifyContent( + cache, jobId[idx], key[idx], blobData); + return null; + } catch (IOException e) { + throw new CompletionException(e); + } + }, + executor); + + futures.add(future); + } + + CompletableFuture<Void> conjunctFuture = FutureUtils.waitForAll(futures); + conjunctFuture.get(); + + // Check how many blob caches exists + int exists = 0, nonExists = 0; + for (int i = 0; i < concurrency; i++) { + if (file[i].exists()) { + exists++; + } else { + nonExists++; + } + } + assertEquals(sizeLimit, exists); + assertEquals(concurrency - sizeLimit, nonExists); + } + } + + private static BlobCacheService initBlobCacheServiceWithSizeLimit( + final Configuration blobClientConfig, + final BlobView blobView, + @Nullable final InetSocketAddress serverAddress, + final long sizeListOfBlobCacheSizeTracker) Review comment: Do you mean `sizeLimitOfBlobCacheSizeTracker `? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.blob.BlobServerPutTest.put; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test for using {@link BlobCacheSizeTracker} to track the size of BLOBs in {@link + * PermanentBlobCache}. When new BLOBs are going to be inserted and the size limit exceeds, {@link + * BlobCacheSizeTracker} will provide excess BLOBs for {@link PermanentBlobCache} to delete. + */ +public class PermanentBlobCacheSizeLimitTest { + + private static final int BLOB_SIZE = 10_000; + + private static final Random RANDOM = new Random(); + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testTrackSizeLimitAndDeleteExcess() throws Exception { + + final BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB; + + final Configuration config = new Configuration(); + config.setString( + BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + // The size limit is the size of 2 blobs + int sizeLimit = 2; Review comment: This is not a size limit but a blob count limit, maybe `maxAcceptedBlobCount`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
