1996fanrui commented on code in PR #23211:
URL: https://github.com/apache/flink/pull/23211#discussion_r1299992229
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws
IOException {
InetSocketAddress serverAddress = new InetSocketAddress("localhost",
12345);
try (PermanentBlobCache cache =
- new PermanentBlobCache(
- config, temporaryFolder.newFolder(), new
VoidBlobStore(), serverAddress)) {
+ TestingBlobUtils.createPermanentCache(tempDir, config,
serverAddress)) {
// register once
cache.registerJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// register a second time
cache.registerJob(jobId);
- assertEquals(2, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// release once
cache.releaseJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// release a second time
long cleanupLowerBound =
System.currentTimeMillis() +
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
cache.releaseJob(jobId);
- assertEquals(0, cache.getJobRefCounters().get(jobId).references);
- assertThat(
- cache.getJobRefCounters().get(jobId).keepUntil,
- greaterThanOrEqualTo(cleanupLowerBound));
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);
Review Comment:
```suggestion
assertThat(cache.getJobRefCounters().get(jobId).references).isZero();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws
IOException {
InetSocketAddress serverAddress = new InetSocketAddress("localhost",
12345);
try (PermanentBlobCache cache =
- new PermanentBlobCache(
- config, temporaryFolder.newFolder(), new
VoidBlobStore(), serverAddress)) {
+ TestingBlobUtils.createPermanentCache(tempDir, config,
serverAddress)) {
// register once
cache.registerJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
Review Comment:
```suggestion
assertThat(cache.getJobRefCounters().get(jobId).references).isOne();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws
IOException {
InetSocketAddress serverAddress = new InetSocketAddress("localhost",
12345);
try (PermanentBlobCache cache =
- new PermanentBlobCache(
- config, temporaryFolder.newFolder(), new
VoidBlobStore(), serverAddress)) {
+ TestingBlobUtils.createPermanentCache(tempDir, config,
serverAddress)) {
// register once
cache.registerJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// register a second time
cache.registerJob(jobId);
- assertEquals(2, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// release once
cache.releaseJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// release a second time
long cleanupLowerBound =
System.currentTimeMillis() +
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
cache.releaseJob(jobId);
- assertEquals(0, cache.getJobRefCounters().get(jobId).references);
- assertThat(
- cache.getJobRefCounters().get(jobId).keepUntil,
- greaterThanOrEqualTo(cleanupLowerBound));
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);
+ assertThat(cache.getJobRefCounters().get(jobId).keepUntil)
+ .isGreaterThanOrEqualTo(cleanupLowerBound);
// register again
cache.registerJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
Review Comment:
```suggestion
assertThat(cache.getJobRefCounters().get(jobId).references).isOne();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws
IOException {
InetSocketAddress serverAddress = new InetSocketAddress("localhost",
12345);
try (PermanentBlobCache cache =
- new PermanentBlobCache(
- config, temporaryFolder.newFolder(), new
VoidBlobStore(), serverAddress)) {
+ TestingBlobUtils.createPermanentCache(tempDir, config,
serverAddress)) {
// register once
cache.registerJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// register a second time
cache.registerJob(jobId);
- assertEquals(2, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// release once
cache.releaseJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// release a second time
long cleanupLowerBound =
System.currentTimeMillis() +
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
cache.releaseJob(jobId);
- assertEquals(0, cache.getJobRefCounters().get(jobId).references);
- assertThat(
- cache.getJobRefCounters().get(jobId).keepUntil,
- greaterThanOrEqualTo(cleanupLowerBound));
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);
+ assertThat(cache.getJobRefCounters().get(jobId).keepUntil)
+ .isGreaterThanOrEqualTo(cleanupLowerBound);
// register again
cache.registerJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// finally release the job
cleanupLowerBound =
System.currentTimeMillis() +
config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
cache.releaseJob(jobId);
- assertEquals(0, cache.getJobRefCounters().get(jobId).references);
- assertThat(
- cache.getJobRefCounters().get(jobId).keepUntil,
- greaterThanOrEqualTo(cleanupLowerBound));
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(0);
Review Comment:
```suggestion
assertThat(cache.getJobRefCounters().get(jobId).references).isZero();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -118,36 +107,36 @@ private void testEquals(BlobKey.BlobType blobType) {
final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1,
RANDOM_ARRAY_1);
final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2,
RANDOM_ARRAY_1);
final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1,
RANDOM_ARRAY_2);
- assertTrue(k1.equals(k2));
- assertTrue(k2.equals(k1));
- assertEquals(k1.hashCode(), k2.hashCode());
- assertFalse(k1.equals(k3));
- assertFalse(k3.equals(k1));
- assertFalse(k1.equals(k4));
- assertFalse(k4.equals(k1));
+ assertThat(k1.equals(k2)).isTrue();
+ assertThat(k2.equals(k1)).isTrue();
+ assertThat(k2.hashCode()).isEqualTo(k1.hashCode());
+ assertThat(k1.equals(k3)).isFalse();
+ assertThat(k3.equals(k1)).isFalse();
+ assertThat(k1.equals(k4)).isFalse();
+ assertThat(k4.equals(k1)).isFalse();
Review Comment:
Can be improved by `isEqualTo`, `isNotEqualTo` and `hasSameHashCodeAs`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java:
##########
@@ -582,57 +545,47 @@ public void testPutBufferFailsForJobHa() throws
IOException {
*/
private void testPutBufferFails(@Nullable final JobID jobId,
BlobKey.BlobType blobType)
throws IOException {
- assumeTrue(!OperatingSystem.isWindows()); // setWritable doesn't work
on Windows.
+ // setWritable doesn't work on Windows.
+ assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work
on Windows").isFalse();
- final Configuration config = new Configuration();
- File tempFileDir = null;
- try (BlobServer server =
- new BlobServer(config, temporaryFolder.newFolder(),
new VoidBlobStore());
- BlobCacheService cache =
- new BlobCacheService(
- config,
- temporaryFolder.newFolder(),
- new VoidBlobStore(),
- new InetSocketAddress("localhost",
server.getPort()))) {
+ Tuple2<BlobServer, BlobCacheService> serverAndCache =
+ TestingBlobUtils.createServerAndCache(tempDir);
+ try (BlobServer server = serverAndCache.f0;
+ BlobCacheService cache = serverAndCache.f1) {
server.start();
// make sure the blob server cannot create any files in its
storage dir
- tempFileDir =
server.createTemporaryFilename().getParentFile().getParentFile();
- assertTrue(tempFileDir.setExecutable(true, false));
- assertTrue(tempFileDir.setReadable(true, false));
- assertTrue(tempFileDir.setWritable(false, false));
+ File tempFileDir =
server.createTemporaryFilename().getParentFile().getParentFile();
+ assertThat(tempFileDir.setExecutable(true, false)).isTrue();
+ assertThat(tempFileDir.setReadable(true, false)).isTrue();
+ assertThat(tempFileDir.setWritable(false, false)).isTrue();
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- // upload the file to the server via the cache
- exception.expect(IOException.class);
- exception.expectMessage("PUT operation failed: ");
-
- put(cache, jobId, data, blobType);
-
- } finally {
- // set writable again to make sure we can remove the directory
- if (tempFileDir != null) {
- //noinspection ResultOfMethodCallIgnored
- tempFileDir.setWritable(true, false);
+ try {
+ assertThatThrownBy(() -> put(cache, jobId, data, blobType))
+ .isInstanceOf(IOException.class)
+ .hasMessageStartingWith("PUT operation failed: ");
+ } finally {
+ assertThat(tempFileDir.setWritable(true, false)).isTrue();
}
}
Review Comment:
This finally should correspond to the previous try, and the second try isn't
necessary, right?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -157,61 +146,65 @@ private void testCompares(BlobKey.BlobType blobType) {
final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1,
RANDOM_ARRAY_1);
final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2,
RANDOM_ARRAY_1);
final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1,
RANDOM_ARRAY_2);
- assertThat(k1.compareTo(k2), is(0));
- assertThat(k2.compareTo(k1), is(0));
- assertThat(k1.compareTo(k3), lessThan(0));
- assertThat(k1.compareTo(k4), lessThan(0));
- assertThat(k3.compareTo(k1), greaterThan(0));
- assertThat(k4.compareTo(k1), greaterThan(0));
+ assertThat(k1.compareTo(k2)).isZero();
+ assertThat(k2.compareTo(k1)).isZero();
+ assertThat(k1.compareTo(k3)).isLessThan(0);
+ assertThat(k1.compareTo(k4)).isLessThan(0);
+ assertThat(k3.compareTo(k1)).isGreaterThan(0);
+ assertThat(k4.compareTo(k1)).isGreaterThan(0);
}
@Test
- public void testComparesDifferentBlobType() {
+ void testComparesDifferentBlobType() {
final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1,
RANDOM_ARRAY_1);
final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1,
RANDOM_ARRAY_1);
- assertThat(k1.compareTo(k2), greaterThan(0));
- assertThat(k2.compareTo(k1), lessThan(0));
+ assertThat(k1.compareTo(k2)).isGreaterThan(0);
+ assertThat(k2.compareTo(k1)).isLessThan(0);
Review Comment:
Can be simplified by `assertThat(k1).isLessThan(k2)`, `isGreaterThan`a nd
`isEqualByComparingTo`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -246,8 +239,8 @@ static void verifyKeyDifferentHashEquals(BlobKey key1,
BlobKey key2) {
* @param key2 second blob key
*/
static void verifyKeyDifferentHashDifferent(BlobKey key1, BlobKey key2) {
- assertNotEquals(key1, key2);
- assertThat(key1.getHash(), not(equalTo(key2.getHash())));
+ assertThat(key1).isNotEqualTo(key2);
+ assertThat(key1.getHash()).isNotEqualTo(key2.getHash());
Review Comment:
```suggestion
assertThat(key1).doesNotHaveSameHashCodeAs(key2);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -235,8 +228,8 @@ private void testStreams(BlobKey.BlobType blobType) throws
IOException {
* @param key2 second blob key
*/
static void verifyKeyDifferentHashEquals(BlobKey key1, BlobKey key2) {
- assertNotEquals(key1, key2);
- assertThat(key1.getHash(), equalTo(key2.getHash()));
+ assertThat(key1).isNotEqualTo(key2);
+ assertThat(key1.getHash()).isEqualTo(key2.getHash());
Review Comment:
```suggestion
assertThat(key1).hasSameHashCodeAs(key2);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -166,46 +157,43 @@ public void testPermanentJobReferences() throws
IOException {
InetSocketAddress serverAddress = new InetSocketAddress("localhost",
12345);
try (PermanentBlobCache cache =
- new PermanentBlobCache(
- config, temporaryFolder.newFolder(), new
VoidBlobStore(), serverAddress)) {
+ TestingBlobUtils.createPermanentCache(tempDir, config,
serverAddress)) {
// register once
cache.registerJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// register a second time
cache.registerJob(jobId);
- assertEquals(2, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(2);
+
assertThat(cache.getJobRefCounters().get(jobId).keepUntil).isEqualTo(-1);
// release once
cache.releaseJob(jobId);
- assertEquals(1, cache.getJobRefCounters().get(jobId).references);
- assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
assertThat(cache.getJobRefCounters().get(jobId).references).isEqualTo(1);
Review Comment:
```suggestion
assertThat(cache.getJobRefCounters().get(jobId).references).isOne();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java:
##########
@@ -92,118 +78,24 @@ private void testGetFailsFromCorruptFile(
final Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(
- HighAvailabilityOptions.HA_STORAGE_PATH,
TEMPORARY_FOLDER.newFolder().getPath());
+ HighAvailabilityOptions.HA_STORAGE_PATH,
TempDirUtils.newFolder(tempDir).getPath());
BlobStoreService blobStoreService = null;
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testGetFailsFromCorruptFile(
+ TestingBlobHelpers.testGetFailsFromCorruptFile(
jobId,
blobType,
corruptOnHAStore,
config,
blobStoreService,
- TEMPORARY_FOLDER.newFolder());
+ TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}
-
- /**
- * Checks the GET operation fails when the downloaded file (from HA store)
is corrupt, i.e. its
- * content's hash does not match the {@link BlobKey}'s hash, using a
permanent BLOB.
- *
- * @param jobId job ID
- * @param config blob server configuration (including HA settings like
{@link
- * HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
- * HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up
<tt>blobStore</tt>
- * @param blobStore shared HA blob store to use
- */
- public static void testGetFailsFromCorruptFile(
- JobID jobId, Configuration config, BlobStore blobStore, File
blobStorage)
- throws IOException {
-
- testGetFailsFromCorruptFile(jobId, PERMANENT_BLOB, true, config,
blobStore, blobStorage);
- }
-
- /**
- * Checks the GET operation fails when the downloaded file (from {@link
BlobServer} or HA store)
- * is corrupt, i.e. its content's hash does not match the {@link
BlobKey}'s hash.
- *
- * @param jobId job ID or <tt>null</tt> if job-unrelated
- * @param blobType whether the BLOB should become permanent or transient
- * @param corruptOnHAStore whether the file should be corrupt in the HA
store (<tt>true</tt>,
- * required <tt>highAvailability</tt> to be set) or on the {@link
BlobServer}'s local store
- * (<tt>false</tt>)
- * @param config blob server configuration (including HA settings like
{@link
- * HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
- * HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up
<tt>blobStore</tt>
- * @param blobStore shared HA blob store to use
- */
- private static void testGetFailsFromCorruptFile(
- @Nullable JobID jobId,
- BlobKey.BlobType blobType,
- boolean corruptOnHAStore,
- Configuration config,
- BlobStore blobStore,
- File blobStorage)
- throws IOException {
-
- assertTrue(
- "corrupt HA file requires a HA setup",
- !corruptOnHAStore || blobType == PERMANENT_BLOB);
-
- Random rnd = new Random();
-
- try (BlobServer server =
- new BlobServer(config, new File(blobStorage,
"server"), blobStore);
- BlobCacheService cache =
- new BlobCacheService(
- config,
- new File(blobStorage, "cache"),
- corruptOnHAStore ? blobStore : new
VoidBlobStore(),
- new InetSocketAddress("localhost",
server.getPort()))) {
-
- server.start();
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- // put content addressable (like libraries)
- BlobKey key = put(server, jobId, data, blobType);
- assertNotNull(key);
-
- // change server/HA store file contents to make sure that GET
requests fail
- byte[] data2 = Arrays.copyOf(data, data.length);
- data2[0] ^= 1;
- if (corruptOnHAStore) {
- File tmpFile = Files.createTempFile("blob", ".jar").toFile();
- try {
- FileUtils.writeByteArrayToFile(tmpFile, data2);
- blobStore.put(tmpFile, jobId, key);
- } finally {
- //noinspection ResultOfMethodCallIgnored
- tmpFile.delete();
- }
-
- // delete local (correct) file on server to make sure that the
GET request does not
- // fall back to downloading the file from the BlobServer's
local store
- File blobFile = server.getStorageLocation(jobId, key);
- assertTrue(blobFile.delete());
- } else {
- File blobFile = server.getStorageLocation(jobId, key);
- assertTrue(blobFile.exists());
- FileUtils.writeByteArrayToFile(blobFile, data2);
- }
-
- // issue a GET request that fails
- assertThatThrownBy(() -> get(cache, jobId, key))
- .satisfies(
- FlinkAssertions.anyCauseMatches(IOException.class,
"data corruption"));
- }
- }
Review Comment:
Why are these code removed? They aren't used, right?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java:
##########
@@ -384,59 +355,53 @@ private void testGetFailsStore(@Nullable final JobID
jobId, BlobKey.BlobType blo
* the file. File transfers should fail.
*/
@Test
- public void testGetFailsHaStoreForJobHa() throws IOException {
+ void testGetFailsHaStoreForJobHa() throws IOException {
final JobID jobId = new JobID();
- final Configuration config = new Configuration();
- try (BlobServer server =
- new BlobServer(config, temporaryFolder.newFolder(),
new VoidBlobStore());
- BlobCacheService cache =
- new BlobCacheService(
- config,
- temporaryFolder.newFolder(),
- new VoidBlobStore(),
- new InetSocketAddress("localhost",
server.getPort()))) {
+ Tuple2<BlobServer, BlobCacheService> serverAndCache =
+ TestingBlobUtils.createServerAndCache(tempDir);
+ try (BlobServer server = serverAndCache.f0;
+ BlobCacheService cache = serverAndCache.f1) {
server.start();
// store the data on the server (and blobStore), remove from local
server store
byte[] data = new byte[2000000];
rnd.nextBytes(data);
PermanentBlobKey blobKey = (PermanentBlobKey) put(server, jobId,
data, PERMANENT_BLOB);
- assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+ assertThat(server.getStorageLocation(jobId,
blobKey).delete()).isTrue();
File tempFileDir =
server.createTemporaryFilename().getParentFile();
- // request the file from the server via the cache
- exception.expect(IOException.class);
- exception.expectMessage("Failed to fetch BLOB ");
-
try {
- get(cache, jobId, blobKey);
+ assertThatThrownBy(() -> get(cache, jobId, blobKey))
+ .isInstanceOf(IOException.class)
+ .hasMessageStartingWith("Failed to fetch BLOB");
} finally {
HashSet<String> expectedDirs = new HashSet<>();
expectedDirs.add("incoming");
expectedDirs.add(JOB_DIR_PREFIX + jobId);
// only the incoming and job directory should exist (no job
directory!)
File storageDir = tempFileDir.getParentFile();
String[] actualDirs = storageDir.list();
- assertNotNull(actualDirs);
- assertEquals(expectedDirs, new
HashSet<>(Arrays.asList(actualDirs)));
+ assertThat(actualDirs).isNotNull();
+ assertThat(actualDirs).isNotEmpty();
+
assertThat(actualDirs).containsExactlyInAnyOrderElementsOf(expectedDirs);
Review Comment:
```suggestion
assertThat(actualDirs).isNotNull().isNotEmpty().containsExactlyInAnyOrderElementsOf(expectedDirs);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -73,42 +62,42 @@ public final class BlobKeyTest extends TestLogger {
}
@Test
- public void testCreateKey() {
+ void testCreateKey() {
BlobKey key = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
verifyType(PERMANENT_BLOB, key);
- assertArrayEquals(KEY_ARRAY_1, key.getHash());
+ assertThat(key.getHash()).isEqualTo(KEY_ARRAY_1);
key = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
verifyType(TRANSIENT_BLOB, key);
- assertArrayEquals(KEY_ARRAY_1, key.getHash());
+ assertThat(key.getHash()).isEqualTo(KEY_ARRAY_1);
}
@Test
- public void testSerializationTransient() throws Exception {
+ void testSerializationTransient() throws Exception {
testSerialization(TRANSIENT_BLOB);
}
@Test
- public void testSerializationPermanent() throws Exception {
+ void testSerializationPermanent() throws Exception {
testSerialization(PERMANENT_BLOB);
}
/** Tests the serialization/deserialization of BLOB keys. */
private void testSerialization(BlobKey.BlobType blobType) throws Exception
{
final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1,
RANDOM_ARRAY_1);
final BlobKey k2 = CommonTestUtils.createCopySerializable(k1);
- assertEquals(k1, k2);
- assertEquals(k1.hashCode(), k2.hashCode());
- assertEquals(0, k1.compareTo(k2));
+ assertThat(k2).isEqualTo(k1);
+ assertThat(k2.hashCode()).isEqualTo(k1.hashCode());
Review Comment:
```suggestion
assertThat(k2).hasSameHashCodeAs(k1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java:
##########
@@ -18,135 +18,41 @@
package org.apache.flink.runtime.blob;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Random;
-
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
-import static
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent;
-import static
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
-import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertTrue;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
/** Tests for the recovery of files of a {@link BlobCacheService} from a HA
store. */
-public class BlobCacheRecoveryTest extends TestLogger {
+public class BlobCacheRecoveryTest {
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @TempDir private java.nio.file.Path tempDir;
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs
are recoverable from
* any participating BlobServer.
*/
@Test
- public void testBlobCacheRecovery() throws Exception {
+ void testBlobCacheRecovery() throws Exception {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(
- HighAvailabilityOptions.HA_STORAGE_PATH,
TEMPORARY_FOLDER.newFolder().getPath());
+ HighAvailabilityOptions.HA_STORAGE_PATH,
TempDirUtils.newFolder(tempDir).getPath());
BlobStoreService blobStoreService = null;
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testBlobCacheRecovery(config, blobStoreService,
TEMPORARY_FOLDER.newFolder());
+ TestingBlobHelpers.testBlobCacheRecovery(
+ config, blobStoreService, TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}
-
- /**
- * Helper to test that the {@link BlobServer} recovery from its HA store
works.
- *
- * <p>Uploads two BLOBs to one {@link BlobServer} via a {@link
BlobCacheService} and expects a
- * second {@link BlobCacheService} to be able to retrieve them from a
second {@link BlobServer}
- * that is configured with the same HA store.
- *
- * @param config blob server configuration (including HA settings like
{@link
- * HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
- * HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up
<tt>blobStore</tt>
- * @param blobStore shared HA blob store to use
- * @throws IOException in case of failures
- */
- public static void testBlobCacheRecovery(
- final Configuration config, final BlobStore blobStore, final File
blobStorage)
- throws IOException {
-
- final String clusterId =
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
- String storagePath =
- config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) +
"/" + clusterId;
- Random rand = new Random();
-
- try (BlobServer server0 =
- new BlobServer(config, new File(blobStorage,
"server0"), blobStore);
- BlobServer server1 =
- new BlobServer(config, new File(blobStorage,
"server1"), blobStore);
- // use VoidBlobStore as the HA store to force download from
each server's HA store
- BlobCacheService cache0 =
- new BlobCacheService(
- config,
- new File(blobStorage, "cache0"),
- new VoidBlobStore(),
- new InetSocketAddress("localhost",
server0.getPort()));
- BlobCacheService cache1 =
- new BlobCacheService(
- config,
- new File(blobStorage, "cache1"),
- new VoidBlobStore(),
- new InetSocketAddress("localhost",
server1.getPort()))) {
-
- server0.start();
- server1.start();
-
- // Random data
- byte[] expected = new byte[1024];
- rand.nextBytes(expected);
- byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
-
- BlobKey[] keys = new BlobKey[2];
- BlobKey nonHAKey;
-
- // Put job-related HA data
- JobID[] jobId = new JobID[] {new JobID(), new JobID()};
- keys[0] = put(cache0, jobId[0], expected, PERMANENT_BLOB); //
Request 1
- keys[1] = put(cache0, jobId[1], expected2, PERMANENT_BLOB); //
Request 2
-
- // put non-HA data
- nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB);
- verifyKeyDifferentHashDifferent(keys[0], nonHAKey);
- verifyKeyDifferentHashEquals(keys[1], nonHAKey);
-
- // check that the storage directory exists
- final Path blobServerPath = new Path(storagePath, "blob");
- FileSystem fs = blobServerPath.getFileSystem();
- assertTrue("Unknown storage dir: " + blobServerPath,
fs.exists(blobServerPath));
-
- // Verify HA requests from cache1 (connected to server1) with no
immediate access to the
- // file
- verifyContents(cache1, jobId[0], keys[0], expected);
- verifyContents(cache1, jobId[1], keys[1], expected2);
-
- // Verify non-HA file is not accessible from server1
- verifyDeleted(cache1, jobId[0], nonHAKey);
- }
- }
Review Comment:
Same question: why remove these method?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java:
##########
@@ -196,51 +190,52 @@ private BlobInfo() {
}
}
- private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws
IOException {
- // Initialize the information of BLOBs
- BlobInfo[] blobs = new BlobInfo[TOTAL_NUM_OF_BLOBS];
-
- // Put all the BLOBs into the blob server one by one
- for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
- blobs[i] = new BlobInfo();
-
- // Put the BLOB into the blob server
- blobs[i].blobKey = put(server, blobs[i].jobId, blobs[i].data,
BLOB_TYPE);
- assertNotNull(blobs[i].blobKey);
- }
-
- return blobs;
- }
-
- private static BlobCacheService initBlobCacheServiceWithSizeLimit(
+ private BlobCacheService initBlobCacheServiceWithSizeLimit(
Configuration config, @Nullable final InetSocketAddress
serverAddress)
throws IOException {
final PermanentBlobCache permanentBlobCache =
new PermanentBlobCache(
config,
- TEMPORARY_FOLDER.newFolder(),
+ tempDir.resolve("permanent_cache").toFile(),
new VoidBlobStore(),
serverAddress,
new BlobCacheSizeTracker(MAX_NUM_OF_ACCEPTED_BLOBS *
BLOB_SIZE));
final TransientBlobCache transientBlobCache =
- new TransientBlobCache(config, TEMPORARY_FOLDER.newFolder(),
serverAddress);
+ new TransientBlobCache(
+ config, tempDir.resolve("transient_cache").toFile(),
serverAddress);
return new BlobCacheService(permanentBlobCache, transientBlobCache);
}
+ private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws
IOException {
+ // Initialize the information of BLOBs
+ BlobInfo[] blobs = new BlobInfo[TOTAL_NUM_OF_BLOBS];
+
+ // Put all the BLOBs into the blob server one by one
+ for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+ blobs[i] = new BlobInfo();
+
+ // Put the BLOB into the blob server
+ blobs[i].blobKey = put(server, blobs[i].jobId, blobs[i].data,
BLOB_TYPE);
+ assertThat(blobs[i].blobKey).isNotNull();
+ }
+
+ return blobs;
+ }
+
private static void readFileAndVerifyContent(
BlobService blobService, JobID jobId, BlobKey blobKey, byte[]
expected)
throws IOException {
- assertNotNull(jobId);
- assertNotNull(blobKey);
- assertTrue(blobKey instanceof PermanentBlobKey);
+ assertThat(jobId).isNotNull();
+ assertThat(blobKey).isNotNull();
+ assertThat(blobKey).isInstanceOf(PermanentBlobKey.class);
Review Comment:
```suggestion
assertThat(blobKey).isNotNull().isInstanceOf(PermanentBlobKey.class);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java:
##########
@@ -302,7 +279,7 @@ private void testJobCleanup(BlobKey.BlobType blobType)
throws IOException {
BlobKey key1a = put(server, jobId1, data, blobType);
BlobKey key2 = put(server, jobId2, data, blobType);
- assertArrayEquals(key1a.getHash(), key2.getHash());
+ assertThat(key1a.getHash()).isEqualTo(key2.getHash());
Review Comment:
hasSameHashCodeAs
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -118,36 +107,36 @@ private void testEquals(BlobKey.BlobType blobType) {
final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1,
RANDOM_ARRAY_1);
final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2,
RANDOM_ARRAY_1);
final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1,
RANDOM_ARRAY_2);
- assertTrue(k1.equals(k2));
- assertTrue(k2.equals(k1));
- assertEquals(k1.hashCode(), k2.hashCode());
- assertFalse(k1.equals(k3));
- assertFalse(k3.equals(k1));
- assertFalse(k1.equals(k4));
- assertFalse(k4.equals(k1));
+ assertThat(k1.equals(k2)).isTrue();
+ assertThat(k2.equals(k1)).isTrue();
+ assertThat(k2.hashCode()).isEqualTo(k1.hashCode());
+ assertThat(k1.equals(k3)).isFalse();
+ assertThat(k3.equals(k1)).isFalse();
+ assertThat(k1.equals(k4)).isFalse();
+ assertThat(k4.equals(k1)).isFalse();
//noinspection ObjectEqualsNull
- assertFalse(k1.equals(null));
+ assertThat(k1.equals(null)).isFalse();
//noinspection EqualsBetweenInconvertibleTypes
- assertFalse(k1.equals(this));
+ assertThat(k1.equals(this)).isFalse();
}
/** Tests the equals method. */
@Test
- public void testEqualsDifferentBlobType() {
+ void testEqualsDifferentBlobType() {
final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1,
RANDOM_ARRAY_1);
final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1,
RANDOM_ARRAY_1);
- assertFalse(k1.equals(k2));
- assertFalse(k2.equals(k1));
+ assertThat(k1.equals(k2)).isFalse();
+ assertThat(k2.equals(k1)).isFalse();
Review Comment:
Same as the last comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]