1996fanrui commented on code in PR #23211:
URL: https://github.com/apache/flink/pull/23211#discussion_r1299992229


##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws 
IOException {
         InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
12345);
 
         try (PermanentBlobCache cache =
-                new PermanentBlobCache(
-                        config, temporaryFolder.newFolder(), new 
VoidBlobStore(), serverAddress)) {
+                TestingBlobUtils.createPermanentCache(tempDir, config, 
serverAddress)) {
 
             // register once
             cache.registerJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // register a second time
             cache.registerJob(jobId);
-            assertEquals(2, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // release once
             cache.releaseJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // release a second time
             long cleanupLowerBound =
                     System.currentTimeMillis() + 
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
             cache.releaseJob(jobId);
-            assertEquals(0, cache.getJobRefCounters().get(jobId).references);
-            assertThat(
-                    cache.getJobRefCounters().get(jobId).keepUntil,
-                    greaterThanOrEqualTo(cleanupLowerBound));
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);

Review Comment:
   ```suggestion
               
assertThat(cache.getJobRefCounters().get(jobId).references).isZero();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws 
IOException {
         InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
12345);
 
         try (PermanentBlobCache cache =
-                new PermanentBlobCache(
-                        config, temporaryFolder.newFolder(), new 
VoidBlobStore(), serverAddress)) {
+                TestingBlobUtils.createPermanentCache(tempDir, config, 
serverAddress)) {
 
             // register once
             cache.registerJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);

Review Comment:
   ```suggestion
               
assertThat(cache.getJobRefCounters().get(jobId).references).isOne();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws 
IOException {
         InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
12345);
 
         try (PermanentBlobCache cache =
-                new PermanentBlobCache(
-                        config, temporaryFolder.newFolder(), new 
VoidBlobStore(), serverAddress)) {
+                TestingBlobUtils.createPermanentCache(tempDir, config, 
serverAddress)) {
 
             // register once
             cache.registerJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // register a second time
             cache.registerJob(jobId);
-            assertEquals(2, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // release once
             cache.releaseJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // release a second time
             long cleanupLowerBound =
                     System.currentTimeMillis() + 
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
             cache.releaseJob(jobId);
-            assertEquals(0, cache.getJobRefCounters().get(jobId).references);
-            assertThat(
-                    cache.getJobRefCounters().get(jobId).keepUntil,
-                    greaterThanOrEqualTo(cleanupLowerBound));
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);
+            assertThat(cache.getJobRefCounters().get(jobId).keepUntil)
+                    .isGreaterThanOrEqualTo(cleanupLowerBound);
 
             // register again
             cache.registerJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);

Review Comment:
   ```suggestion
               
assertThat(cache.getJobRefCounters().get(jobId).references).isOne();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws 
IOException {
         InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
12345);
 
         try (PermanentBlobCache cache =
-                new PermanentBlobCache(
-                        config, temporaryFolder.newFolder(), new 
VoidBlobStore(), serverAddress)) {
+                TestingBlobUtils.createPermanentCache(tempDir, config, 
serverAddress)) {
 
             // register once
             cache.registerJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // register a second time
             cache.registerJob(jobId);
-            assertEquals(2, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // release once
             cache.releaseJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // release a second time
             long cleanupLowerBound =
                     System.currentTimeMillis() + 
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
             cache.releaseJob(jobId);
-            assertEquals(0, cache.getJobRefCounters().get(jobId).references);
-            assertThat(
-                    cache.getJobRefCounters().get(jobId).keepUntil,
-                    greaterThanOrEqualTo(cleanupLowerBound));
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);
+            assertThat(cache.getJobRefCounters().get(jobId).keepUntil)
+                    .isGreaterThanOrEqualTo(cleanupLowerBound);
 
             // register again
             cache.registerJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // finally release the job
             cleanupLowerBound =
                     System.currentTimeMillis() + 
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
             cache.releaseJob(jobId);
-            assertEquals(0, cache.getJobRefCounters().get(jobId).references);
-            assertThat(
-                    cache.getJobRefCounters().get(jobId).keepUntil,
-                    greaterThanOrEqualTo(cleanupLowerBound));
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);

Review Comment:
   ```suggestion
               
assertThat(cache.getJobRefCounters().get(jobId).references).isZero();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -118,36 +107,36 @@ private void testEquals(BlobKey.BlobType blobType) {
         final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
         final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, 
RANDOM_ARRAY_1);
         final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_2);
-        assertTrue(k1.equals(k2));
-        assertTrue(k2.equals(k1));
-        assertEquals(k1.hashCode(), k2.hashCode());
-        assertFalse(k1.equals(k3));
-        assertFalse(k3.equals(k1));
-        assertFalse(k1.equals(k4));
-        assertFalse(k4.equals(k1));
+        assertThat(k1.equals(k2)).isTrue();
+        assertThat(k2.equals(k1)).isTrue();
+        assertThat(k2.hashCode()).isEqualTo(k1.hashCode());
+        assertThat(k1.equals(k3)).isFalse();
+        assertThat(k3.equals(k1)).isFalse();
+        assertThat(k1.equals(k4)).isFalse();
+        assertThat(k4.equals(k1)).isFalse();

Review Comment:
   Can be improved by `isEqualTo`, `isNotEqualTo` and `hasSameHashCodeAs`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java:
##########
@@ -582,57 +545,47 @@ public void testPutBufferFailsForJobHa() throws 
IOException {
      */
     private void testPutBufferFails(@Nullable final JobID jobId, 
BlobKey.BlobType blobType)
             throws IOException {
-        assumeTrue(!OperatingSystem.isWindows()); // setWritable doesn't work 
on Windows.
+        // setWritable doesn't work on Windows.
+        assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work 
on Windows").isFalse();
 
-        final Configuration config = new Configuration();
-        File tempFileDir = null;
-        try (BlobServer server =
-                        new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                temporaryFolder.newFolder(),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
+        Tuple2<BlobServer, BlobCacheService> serverAndCache =
+                TestingBlobUtils.createServerAndCache(tempDir);
 
+        try (BlobServer server = serverAndCache.f0;
+                BlobCacheService cache = serverAndCache.f1) {
             server.start();
 
             // make sure the blob server cannot create any files in its 
storage dir
-            tempFileDir = 
server.createTemporaryFilename().getParentFile().getParentFile();
-            assertTrue(tempFileDir.setExecutable(true, false));
-            assertTrue(tempFileDir.setReadable(true, false));
-            assertTrue(tempFileDir.setWritable(false, false));
+            File tempFileDir = 
server.createTemporaryFilename().getParentFile().getParentFile();
+            assertThat(tempFileDir.setExecutable(true, false)).isTrue();
+            assertThat(tempFileDir.setReadable(true, false)).isTrue();
+            assertThat(tempFileDir.setWritable(false, false)).isTrue();
 
             byte[] data = new byte[2000000];
             rnd.nextBytes(data);
 
-            // upload the file to the server via the cache
-            exception.expect(IOException.class);
-            exception.expectMessage("PUT operation failed: ");
-
-            put(cache, jobId, data, blobType);
-
-        } finally {
-            // set writable again to make sure we can remove the directory
-            if (tempFileDir != null) {
-                //noinspection ResultOfMethodCallIgnored
-                tempFileDir.setWritable(true, false);
+            try {
+                assertThatThrownBy(() -> put(cache, jobId, data, blobType))
+                        .isInstanceOf(IOException.class)
+                        .hasMessageStartingWith("PUT operation failed: ");
+            } finally {
+                assertThat(tempFileDir.setWritable(true, false)).isTrue();
             }
         }

Review Comment:
   This finally should correspond to the previous try, and the second try isn't 
necessary, right?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -157,61 +146,65 @@ private void testCompares(BlobKey.BlobType blobType) {
         final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
         final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, 
RANDOM_ARRAY_1);
         final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_2);
-        assertThat(k1.compareTo(k2), is(0));
-        assertThat(k2.compareTo(k1), is(0));
-        assertThat(k1.compareTo(k3), lessThan(0));
-        assertThat(k1.compareTo(k4), lessThan(0));
-        assertThat(k3.compareTo(k1), greaterThan(0));
-        assertThat(k4.compareTo(k1), greaterThan(0));
+        assertThat(k1.compareTo(k2)).isZero();
+        assertThat(k2.compareTo(k1)).isZero();
+        assertThat(k1.compareTo(k3)).isLessThan(0);
+        assertThat(k1.compareTo(k4)).isLessThan(0);
+        assertThat(k3.compareTo(k1)).isGreaterThan(0);
+        assertThat(k4.compareTo(k1)).isGreaterThan(0);
     }
 
     @Test
-    public void testComparesDifferentBlobType() {
+    void testComparesDifferentBlobType() {
         final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
         final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
-        assertThat(k1.compareTo(k2), greaterThan(0));
-        assertThat(k2.compareTo(k1), lessThan(0));
+        assertThat(k1.compareTo(k2)).isGreaterThan(0);
+        assertThat(k2.compareTo(k1)).isLessThan(0);

Review Comment:
   Can be simplified by `assertThat(k1).isLessThan(k2)`, `isGreaterThan`a nd 
`isEqualByComparingTo`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -246,8 +239,8 @@ static void verifyKeyDifferentHashEquals(BlobKey key1, 
BlobKey key2) {
      * @param key2 second blob key
      */
     static void verifyKeyDifferentHashDifferent(BlobKey key1, BlobKey key2) {
-        assertNotEquals(key1, key2);
-        assertThat(key1.getHash(), not(equalTo(key2.getHash())));
+        assertThat(key1).isNotEqualTo(key2);
+        assertThat(key1.getHash()).isNotEqualTo(key2.getHash());

Review Comment:
   ```suggestion
           assertThat(key1).doesNotHaveSameHashCodeAs(key2);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -235,8 +228,8 @@ private void testStreams(BlobKey.BlobType blobType) throws 
IOException {
      * @param key2 second blob key
      */
     static void verifyKeyDifferentHashEquals(BlobKey key1, BlobKey key2) {
-        assertNotEquals(key1, key2);
-        assertThat(key1.getHash(), equalTo(key2.getHash()));
+        assertThat(key1).isNotEqualTo(key2);
+        assertThat(key1.getHash()).isEqualTo(key2.getHash());

Review Comment:
   ```suggestion
           assertThat(key1).hasSameHashCodeAs(key2);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws 
IOException {
         InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
12345);
 
         try (PermanentBlobCache cache =
-                new PermanentBlobCache(
-                        config, temporaryFolder.newFolder(), new 
VoidBlobStore(), serverAddress)) {
+                TestingBlobUtils.createPermanentCache(tempDir, config, 
serverAddress)) {
 
             // register once
             cache.registerJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // register a second time
             cache.registerJob(jobId);
-            assertEquals(2, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+            
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
 
             // release once
             cache.releaseJob(jobId);
-            assertEquals(1, cache.getJobRefCounters().get(jobId).references);
-            assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+            
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);

Review Comment:
   ```suggestion
               
assertThat(cache.getJobRefCounters().get(jobId).references).isOne();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java:
##########
@@ -92,118 +78,24 @@ private void testGetFailsFromCorruptFile(
         final Configuration config = new Configuration();
         config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
         config.setString(
-                HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getPath());
+                HighAvailabilityOptions.HA_STORAGE_PATH, 
TempDirUtils.newFolder(tempDir).getPath());
 
         BlobStoreService blobStoreService = null;
 
         try {
             blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-            testGetFailsFromCorruptFile(
+            TestingBlobHelpers.testGetFailsFromCorruptFile(
                     jobId,
                     blobType,
                     corruptOnHAStore,
                     config,
                     blobStoreService,
-                    TEMPORARY_FOLDER.newFolder());
+                    TempDirUtils.newFolder(tempDir));
         } finally {
             if (blobStoreService != null) {
                 blobStoreService.closeAndCleanupAllData();
             }
         }
     }
-
-    /**
-     * Checks the GET operation fails when the downloaded file (from HA store) 
is corrupt, i.e. its
-     * content's hash does not match the {@link BlobKey}'s hash, using a 
permanent BLOB.
-     *
-     * @param jobId job ID
-     * @param config blob server configuration (including HA settings like 
{@link
-     *     HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
-     *     HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up 
<tt>blobStore</tt>
-     * @param blobStore shared HA blob store to use
-     */
-    public static void testGetFailsFromCorruptFile(
-            JobID jobId, Configuration config, BlobStore blobStore, File 
blobStorage)
-            throws IOException {
-
-        testGetFailsFromCorruptFile(jobId, PERMANENT_BLOB, true, config, 
blobStore, blobStorage);
-    }
-
-    /**
-     * Checks the GET operation fails when the downloaded file (from {@link 
BlobServer} or HA store)
-     * is corrupt, i.e. its content's hash does not match the {@link 
BlobKey}'s hash.
-     *
-     * @param jobId job ID or <tt>null</tt> if job-unrelated
-     * @param blobType whether the BLOB should become permanent or transient
-     * @param corruptOnHAStore whether the file should be corrupt in the HA 
store (<tt>true</tt>,
-     *     required <tt>highAvailability</tt> to be set) or on the {@link 
BlobServer}'s local store
-     *     (<tt>false</tt>)
-     * @param config blob server configuration (including HA settings like 
{@link
-     *     HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
-     *     HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up 
<tt>blobStore</tt>
-     * @param blobStore shared HA blob store to use
-     */
-    private static void testGetFailsFromCorruptFile(
-            @Nullable JobID jobId,
-            BlobKey.BlobType blobType,
-            boolean corruptOnHAStore,
-            Configuration config,
-            BlobStore blobStore,
-            File blobStorage)
-            throws IOException {
-
-        assertTrue(
-                "corrupt HA file requires a HA setup",
-                !corruptOnHAStore || blobType == PERMANENT_BLOB);
-
-        Random rnd = new Random();
-
-        try (BlobServer server =
-                        new BlobServer(config, new File(blobStorage, 
"server"), blobStore);
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                new File(blobStorage, "cache"),
-                                corruptOnHAStore ? blobStore : new 
VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
-
-            server.start();
-
-            byte[] data = new byte[2000000];
-            rnd.nextBytes(data);
-
-            // put content addressable (like libraries)
-            BlobKey key = put(server, jobId, data, blobType);
-            assertNotNull(key);
-
-            // change server/HA store file contents to make sure that GET 
requests fail
-            byte[] data2 = Arrays.copyOf(data, data.length);
-            data2[0] ^= 1;
-            if (corruptOnHAStore) {
-                File tmpFile = Files.createTempFile("blob", ".jar").toFile();
-                try {
-                    FileUtils.writeByteArrayToFile(tmpFile, data2);
-                    blobStore.put(tmpFile, jobId, key);
-                } finally {
-                    //noinspection ResultOfMethodCallIgnored
-                    tmpFile.delete();
-                }
-
-                // delete local (correct) file on server to make sure that the 
GET request does not
-                // fall back to downloading the file from the BlobServer's 
local store
-                File blobFile = server.getStorageLocation(jobId, key);
-                assertTrue(blobFile.delete());
-            } else {
-                File blobFile = server.getStorageLocation(jobId, key);
-                assertTrue(blobFile.exists());
-                FileUtils.writeByteArrayToFile(blobFile, data2);
-            }
-
-            // issue a GET request that fails
-            assertThatThrownBy(() -> get(cache, jobId, key))
-                    .satisfies(
-                            FlinkAssertions.anyCauseMatches(IOException.class, 
"data corruption"));
-        }
-    }

Review Comment:
   Why are these code removed? They aren't used, right?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java:
##########
@@ -384,59 +355,53 @@ private void testGetFailsStore(@Nullable final JobID 
jobId, BlobKey.BlobType blo
      * the file. File transfers should fail.
      */
     @Test
-    public void testGetFailsHaStoreForJobHa() throws IOException {
+    void testGetFailsHaStoreForJobHa() throws IOException {
         final JobID jobId = new JobID();
 
-        final Configuration config = new Configuration();
-        try (BlobServer server =
-                        new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                temporaryFolder.newFolder(),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
+        Tuple2<BlobServer, BlobCacheService> serverAndCache =
+                TestingBlobUtils.createServerAndCache(tempDir);
 
+        try (BlobServer server = serverAndCache.f0;
+                BlobCacheService cache = serverAndCache.f1) {
             server.start();
 
             // store the data on the server (and blobStore), remove from local 
server store
             byte[] data = new byte[2000000];
             rnd.nextBytes(data);
             PermanentBlobKey blobKey = (PermanentBlobKey) put(server, jobId, 
data, PERMANENT_BLOB);
-            assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).delete()).isTrue();
 
             File tempFileDir = 
server.createTemporaryFilename().getParentFile();
 
-            // request the file from the server via the cache
-            exception.expect(IOException.class);
-            exception.expectMessage("Failed to fetch BLOB ");
-
             try {
-                get(cache, jobId, blobKey);
+                assertThatThrownBy(() -> get(cache, jobId, blobKey))
+                        .isInstanceOf(IOException.class)
+                        .hasMessageStartingWith("Failed to fetch BLOB");
             } finally {
                 HashSet<String> expectedDirs = new HashSet<>();
                 expectedDirs.add("incoming");
                 expectedDirs.add(JOB_DIR_PREFIX + jobId);
                 // only the incoming and job directory should exist (no job 
directory!)
                 File storageDir = tempFileDir.getParentFile();
                 String[] actualDirs = storageDir.list();
-                assertNotNull(actualDirs);
-                assertEquals(expectedDirs, new 
HashSet<>(Arrays.asList(actualDirs)));
+                assertThat(actualDirs).isNotNull();
+                assertThat(actualDirs).isNotEmpty();
+                
assertThat(actualDirs).containsExactlyInAnyOrderElementsOf(expectedDirs);

Review Comment:
   ```suggestion
                   
assertThat(actualDirs).isNotNull().isNotEmpty().containsExactlyInAnyOrderElementsOf(expectedDirs);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -73,42 +62,42 @@ public final class BlobKeyTest extends TestLogger {
     }
 
     @Test
-    public void testCreateKey() {
+    void testCreateKey() {
         BlobKey key = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
         verifyType(PERMANENT_BLOB, key);
-        assertArrayEquals(KEY_ARRAY_1, key.getHash());
+        assertThat(key.getHash()).isEqualTo(KEY_ARRAY_1);
 
         key = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
         verifyType(TRANSIENT_BLOB, key);
-        assertArrayEquals(KEY_ARRAY_1, key.getHash());
+        assertThat(key.getHash()).isEqualTo(KEY_ARRAY_1);
     }
 
     @Test
-    public void testSerializationTransient() throws Exception {
+    void testSerializationTransient() throws Exception {
         testSerialization(TRANSIENT_BLOB);
     }
 
     @Test
-    public void testSerializationPermanent() throws Exception {
+    void testSerializationPermanent() throws Exception {
         testSerialization(PERMANENT_BLOB);
     }
 
     /** Tests the serialization/deserialization of BLOB keys. */
     private void testSerialization(BlobKey.BlobType blobType) throws Exception 
{
         final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
         final BlobKey k2 = CommonTestUtils.createCopySerializable(k1);
-        assertEquals(k1, k2);
-        assertEquals(k1.hashCode(), k2.hashCode());
-        assertEquals(0, k1.compareTo(k2));
+        assertThat(k2).isEqualTo(k1);
+        assertThat(k2.hashCode()).isEqualTo(k1.hashCode());

Review Comment:
   ```suggestion
           assertThat(k2).hasSameHashCodeAs(k1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java:
##########
@@ -18,135 +18,41 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Random;
-
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
-import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent;
-import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
-import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertTrue;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 /** Tests for the recovery of files of a {@link BlobCacheService} from a HA 
store. */
-public class BlobCacheRecoveryTest extends TestLogger {
+public class BlobCacheRecoveryTest {
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+    @TempDir private java.nio.file.Path tempDir;
 
     /**
      * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs 
are recoverable from
      * any participating BlobServer.
      */
     @Test
-    public void testBlobCacheRecovery() throws Exception {
+    void testBlobCacheRecovery() throws Exception {
         Configuration config = new Configuration();
         config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
         config.setString(
-                HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getPath());
+                HighAvailabilityOptions.HA_STORAGE_PATH, 
TempDirUtils.newFolder(tempDir).getPath());
 
         BlobStoreService blobStoreService = null;
 
         try {
             blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-            testBlobCacheRecovery(config, blobStoreService, 
TEMPORARY_FOLDER.newFolder());
+            TestingBlobHelpers.testBlobCacheRecovery(
+                    config, blobStoreService, TempDirUtils.newFolder(tempDir));
         } finally {
             if (blobStoreService != null) {
                 blobStoreService.closeAndCleanupAllData();
             }
         }
     }
-
-    /**
-     * Helper to test that the {@link BlobServer} recovery from its HA store 
works.
-     *
-     * <p>Uploads two BLOBs to one {@link BlobServer} via a {@link 
BlobCacheService} and expects a
-     * second {@link BlobCacheService} to be able to retrieve them from a 
second {@link BlobServer}
-     * that is configured with the same HA store.
-     *
-     * @param config blob server configuration (including HA settings like 
{@link
-     *     HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
-     *     HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up 
<tt>blobStore</tt>
-     * @param blobStore shared HA blob store to use
-     * @throws IOException in case of failures
-     */
-    public static void testBlobCacheRecovery(
-            final Configuration config, final BlobStore blobStore, final File 
blobStorage)
-            throws IOException {
-
-        final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
-        String storagePath =
-                config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + 
"/" + clusterId;
-        Random rand = new Random();
-
-        try (BlobServer server0 =
-                        new BlobServer(config, new File(blobStorage, 
"server0"), blobStore);
-                BlobServer server1 =
-                        new BlobServer(config, new File(blobStorage, 
"server1"), blobStore);
-                // use VoidBlobStore as the HA store to force download from 
each server's HA store
-                BlobCacheService cache0 =
-                        new BlobCacheService(
-                                config,
-                                new File(blobStorage, "cache0"),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server0.getPort()));
-                BlobCacheService cache1 =
-                        new BlobCacheService(
-                                config,
-                                new File(blobStorage, "cache1"),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server1.getPort()))) {
-
-            server0.start();
-            server1.start();
-
-            // Random data
-            byte[] expected = new byte[1024];
-            rand.nextBytes(expected);
-            byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
-
-            BlobKey[] keys = new BlobKey[2];
-            BlobKey nonHAKey;
-
-            // Put job-related HA data
-            JobID[] jobId = new JobID[] {new JobID(), new JobID()};
-            keys[0] = put(cache0, jobId[0], expected, PERMANENT_BLOB); // 
Request 1
-            keys[1] = put(cache0, jobId[1], expected2, PERMANENT_BLOB); // 
Request 2
-
-            // put non-HA data
-            nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB);
-            verifyKeyDifferentHashDifferent(keys[0], nonHAKey);
-            verifyKeyDifferentHashEquals(keys[1], nonHAKey);
-
-            // check that the storage directory exists
-            final Path blobServerPath = new Path(storagePath, "blob");
-            FileSystem fs = blobServerPath.getFileSystem();
-            assertTrue("Unknown storage dir: " + blobServerPath, 
fs.exists(blobServerPath));
-
-            // Verify HA requests from cache1 (connected to server1) with no 
immediate access to the
-            // file
-            verifyContents(cache1, jobId[0], keys[0], expected);
-            verifyContents(cache1, jobId[1], keys[1], expected2);
-
-            // Verify non-HA file is not accessible from server1
-            verifyDeleted(cache1, jobId[0], nonHAKey);
-        }
-    }

Review Comment:
   Same question: why remove these method?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java:
##########
@@ -196,51 +190,52 @@ private BlobInfo() {
         }
     }
 
-    private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws 
IOException {
-        // Initialize the information of BLOBs
-        BlobInfo[] blobs = new BlobInfo[TOTAL_NUM_OF_BLOBS];
-
-        // Put all the BLOBs into the blob server one by one
-        for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
-            blobs[i] = new BlobInfo();
-
-            // Put the BLOB into the blob server
-            blobs[i].blobKey = put(server, blobs[i].jobId, blobs[i].data, 
BLOB_TYPE);
-            assertNotNull(blobs[i].blobKey);
-        }
-
-        return blobs;
-    }
-
-    private static BlobCacheService initBlobCacheServiceWithSizeLimit(
+    private BlobCacheService initBlobCacheServiceWithSizeLimit(
             Configuration config, @Nullable final InetSocketAddress 
serverAddress)
             throws IOException {
 
         final PermanentBlobCache permanentBlobCache =
                 new PermanentBlobCache(
                         config,
-                        TEMPORARY_FOLDER.newFolder(),
+                        tempDir.resolve("permanent_cache").toFile(),
                         new VoidBlobStore(),
                         serverAddress,
                         new BlobCacheSizeTracker(MAX_NUM_OF_ACCEPTED_BLOBS * 
BLOB_SIZE));
 
         final TransientBlobCache transientBlobCache =
-                new TransientBlobCache(config, TEMPORARY_FOLDER.newFolder(), 
serverAddress);
+                new TransientBlobCache(
+                        config, tempDir.resolve("transient_cache").toFile(), 
serverAddress);
 
         return new BlobCacheService(permanentBlobCache, transientBlobCache);
     }
 
+    private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws 
IOException {
+        // Initialize the information of BLOBs
+        BlobInfo[] blobs = new BlobInfo[TOTAL_NUM_OF_BLOBS];
+
+        // Put all the BLOBs into the blob server one by one
+        for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+            blobs[i] = new BlobInfo();
+
+            // Put the BLOB into the blob server
+            blobs[i].blobKey = put(server, blobs[i].jobId, blobs[i].data, 
BLOB_TYPE);
+            assertThat(blobs[i].blobKey).isNotNull();
+        }
+
+        return blobs;
+    }
+
     private static void readFileAndVerifyContent(
             BlobService blobService, JobID jobId, BlobKey blobKey, byte[] 
expected)
             throws IOException {
 
-        assertNotNull(jobId);
-        assertNotNull(blobKey);
-        assertTrue(blobKey instanceof PermanentBlobKey);
+        assertThat(jobId).isNotNull();
+        assertThat(blobKey).isNotNull();
+        assertThat(blobKey).isInstanceOf(PermanentBlobKey.class);

Review Comment:
   ```suggestion
           assertThat(blobKey).isNotNull().isInstanceOf(PermanentBlobKey.class);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java:
##########
@@ -302,7 +279,7 @@ private void testJobCleanup(BlobKey.BlobType blobType) 
throws IOException {
 
             BlobKey key1a = put(server, jobId1, data, blobType);
             BlobKey key2 = put(server, jobId2, data, blobType);
-            assertArrayEquals(key1a.getHash(), key2.getHash());
+            assertThat(key1a.getHash()).isEqualTo(key2.getHash());

Review Comment:
   hasSameHashCodeAs



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -118,36 +107,36 @@ private void testEquals(BlobKey.BlobType blobType) {
         final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
         final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, 
RANDOM_ARRAY_1);
         final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_2);
-        assertTrue(k1.equals(k2));
-        assertTrue(k2.equals(k1));
-        assertEquals(k1.hashCode(), k2.hashCode());
-        assertFalse(k1.equals(k3));
-        assertFalse(k3.equals(k1));
-        assertFalse(k1.equals(k4));
-        assertFalse(k4.equals(k1));
+        assertThat(k1.equals(k2)).isTrue();
+        assertThat(k2.equals(k1)).isTrue();
+        assertThat(k2.hashCode()).isEqualTo(k1.hashCode());
+        assertThat(k1.equals(k3)).isFalse();
+        assertThat(k3.equals(k1)).isFalse();
+        assertThat(k1.equals(k4)).isFalse();
+        assertThat(k4.equals(k1)).isFalse();
 
         //noinspection ObjectEqualsNull
-        assertFalse(k1.equals(null));
+        assertThat(k1.equals(null)).isFalse();
         //noinspection EqualsBetweenInconvertibleTypes
-        assertFalse(k1.equals(this));
+        assertThat(k1.equals(this)).isFalse();
     }
 
     /** Tests the equals method. */
     @Test
-    public void testEqualsDifferentBlobType() {
+    void testEqualsDifferentBlobType() {
         final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
         final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
-        assertFalse(k1.equals(k2));
-        assertFalse(k2.equals(k1));
+        assertThat(k1.equals(k2)).isFalse();
+        assertThat(k2.equals(k1)).isFalse();

Review Comment:
   Same as the last comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to