ferenc-csaky commented on code in PR #23211:
URL: https://github.com/apache/flink/pull/23211#discussion_r1300622354
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java:
##########
@@ -92,118 +78,24 @@ private void testGetFailsFromCorruptFile(
final Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(
- HighAvailabilityOptions.HA_STORAGE_PATH,
TEMPORARY_FOLDER.newFolder().getPath());
+ HighAvailabilityOptions.HA_STORAGE_PATH,
TempDirUtils.newFolder(tempDir).getPath());
BlobStoreService blobStoreService = null;
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testGetFailsFromCorruptFile(
+ TestingBlobHelpers.testGetFailsFromCorruptFile(
jobId,
blobType,
corruptOnHAStore,
config,
blobStoreService,
- TEMPORARY_FOLDER.newFolder());
+ TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}
-
- /**
- * Checks the GET operation fails when the downloaded file (from HA store)
is corrupt, i.e. its
- * content's hash does not match the {@link BlobKey}'s hash, using a
permanent BLOB.
- *
- * @param jobId job ID
- * @param config blob server configuration (including HA settings like
{@link
- * HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
- * HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up
<tt>blobStore</tt>
- * @param blobStore shared HA blob store to use
- */
- public static void testGetFailsFromCorruptFile(
- JobID jobId, Configuration config, BlobStore blobStore, File
blobStorage)
- throws IOException {
-
- testGetFailsFromCorruptFile(jobId, PERMANENT_BLOB, true, config,
blobStore, blobStorage);
- }
-
- /**
- * Checks the GET operation fails when the downloaded file (from {@link
BlobServer} or HA store)
- * is corrupt, i.e. its content's hash does not match the {@link
BlobKey}'s hash.
- *
- * @param jobId job ID or <tt>null</tt> if job-unrelated
- * @param blobType whether the BLOB should become permanent or transient
- * @param corruptOnHAStore whether the file should be corrupt in the HA
store (<tt>true</tt>,
- * required <tt>highAvailability</tt> to be set) or on the {@link
BlobServer}'s local store
- * (<tt>false</tt>)
- * @param config blob server configuration (including HA settings like
{@link
- * HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
- * HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up
<tt>blobStore</tt>
- * @param blobStore shared HA blob store to use
- */
- private static void testGetFailsFromCorruptFile(
- @Nullable JobID jobId,
- BlobKey.BlobType blobType,
- boolean corruptOnHAStore,
- Configuration config,
- BlobStore blobStore,
- File blobStorage)
- throws IOException {
-
- assertTrue(
- "corrupt HA file requires a HA setup",
- !corruptOnHAStore || blobType == PERMANENT_BLOB);
-
- Random rnd = new Random();
-
- try (BlobServer server =
- new BlobServer(config, new File(blobStorage,
"server"), blobStore);
- BlobCacheService cache =
- new BlobCacheService(
- config,
- new File(blobStorage, "cache"),
- corruptOnHAStore ? blobStore : new
VoidBlobStore(),
- new InetSocketAddress("localhost",
server.getPort()))) {
-
- server.start();
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- // put content addressable (like libraries)
- BlobKey key = put(server, jobId, data, blobType);
- assertNotNull(key);
-
- // change server/HA store file contents to make sure that GET
requests fail
- byte[] data2 = Arrays.copyOf(data, data.length);
- data2[0] ^= 1;
- if (corruptOnHAStore) {
- File tmpFile = Files.createTempFile("blob", ".jar").toFile();
- try {
- FileUtils.writeByteArrayToFile(tmpFile, data2);
- blobStore.put(tmpFile, jobId, key);
- } finally {
- //noinspection ResultOfMethodCallIgnored
- tmpFile.delete();
- }
-
- // delete local (correct) file on server to make sure that the
GET request does not
- // fall back to downloading the file from the BlobServer's
local store
- File blobFile = server.getStorageLocation(jobId, key);
- assertTrue(blobFile.delete());
- } else {
- File blobFile = server.getStorageLocation(jobId, key);
- assertTrue(blobFile.exists());
- FileUtils.writeByteArrayToFile(blobFile, data2);
- }
-
- // issue a GET request that fails
- assertThatThrownBy(() -> get(cache, jobId, key))
- .satisfies(
- FlinkAssertions.anyCauseMatches(IOException.class,
"data corruption"));
- }
- }
Review Comment:
Those methods are moved to `TestingBlobHelpers`, cause they are imported in
another module's test class, so in order to be able to make the test class
package-private, I created a new test helpers util class.
Furthermore, personally I think it is not good practice to keep general test
utils in unit test files, it makes the test casess less readable and generates
confusion on what util logic is tightly coupled with the test class and what
functions have more general purpose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]