bhattmanish98 commented on code in PR #7386:
URL: https://github.com/apache/hadoop/pull/7386#discussion_r1979065621
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java:
##########
@@ -1655,6 +1730,827 @@ public void
testRenameSrcDirDeleteEmitDeletionCountInClientRequestId()
fs.rename(new Path(dirPathStr), new Path("/dst/"));
}
+ /**
+ * Helper method to configure the AzureBlobFileSystem and rename directories.
+ *
+ * @param currentFs The current AzureBlobFileSystem to use for renaming.
+ * @param producerQueueSize Maximum size of the producer queue.
+ * @param consumerMaxLag Maximum lag allowed for the consumer.
+ * @param maxThread Maximum threads for the rename operation.
+ * @param src The source path of the directory to rename.
+ * @param dst The destination path of the renamed directory.
+ * @throws IOException If an I/O error occurs during the operation.
+ */
+ private void renameDir(AzureBlobFileSystem currentFs, String
producerQueueSize,
+ String consumerMaxLag, String maxThread, Path src, Path dst)
+ throws IOException {
+ Configuration config = createConfig(producerQueueSize, consumerMaxLag,
maxThread);
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(currentFs.getUri(), config)) {
+ fs.rename(src, dst);
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Helper method to create the configuration for the AzureBlobFileSystem.
+ *
+ * @param producerQueueSize Maximum size of the producer queue.
+ * @param consumerMaxLag Maximum lag allowed for the consumer.
+ * @param maxThread Maximum threads for the rename operation.
+ * @return The configuration object.
+ */
+ private Configuration createConfig(String producerQueueSize, String
consumerMaxLag, String maxThread) {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize);
+ config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag);
+ config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread);
+ return config;
+ }
+
+ /**
+ * Helper method to validate that the rename was successful and that the
destination exists.
+ *
+ * @param fs The AzureBlobFileSystem instance to check the existence on.
+ * @param dst The destination path.
+ * @param src The source path.
+ * @throws IOException If an I/O error occurs during the validation.
+ */
+ private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+ boolean isSrcExist, boolean isDstExist, boolean isJsonExist)
+ throws IOException {
+ Assertions.assertThat(fs.exists(dst))
+ .describedAs("Renamed Destination directory should exist.")
+ .isEqualTo(isDstExist);
+ Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() +
SUFFIX)))
+ .describedAs("Renamed Pending Json file should exist.")
+ .isEqualTo(isJsonExist);
+ Assertions.assertThat(fs.exists(src))
+ .describedAs("Renamed Destination directory should exist.")
+ .isEqualTo(isSrcExist);
+ }
+
+ /**
+ * Test the renaming of a directory with different parallelism
configurations.
+ */
+ @Test
+ public void testRenameDirWithDifferentParallelism() throws Exception {
+ try (AzureBlobFileSystem currentFs = getFileSystem()) {
+ assumeBlobServiceType();
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(currentFs, src, TOTAL_FILES);
+
+ // Test renaming with different configurations
+ renameDir(currentFs, "10", "5", "2", src, dst);
+ renameDir(currentFs, "100", "5", "2", dst, src);
+
+ String errorMessage = intercept(PathIOException.class,
+ () -> renameDir(currentFs, "50", "50", "5", src, dst))
+ .getMessage();
+
+ // Validate error message for invalid configuration
+ Assertions.assertThat(errorMessage)
+ .describedAs("maxConsumptionLag should be lesser than maxSize")
+ .contains(
+ "Invalid configuration value detected for
\"fs.azure.blob.dir.list.consumer.max.lag\". "
+ + "maxConsumptionLag should be lesser than maxSize");
+ }
+ }
+
+ /**
+ * Helper method to create files in the given directory.
+ *
+ * @param fs The AzureBlobFileSystem instance to use for file creation.
+ * @param src The source path (directory).
+ * @param numFiles The number of files to create.
+ * @throws ExecutionException, InterruptedException If an error occurs
during file creation.
+ */
+ private void createFiles(AzureBlobFileSystem fs, Path src, int numFiles)
+ throws ExecutionException, InterruptedException {
+ ExecutorService executorService =
Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
+ List<Future> futures = new ArrayList<>();
+ for (int i = 0; i < numFiles; i++) {
+ final int iter = i;
+ Future future = executorService.submit(() ->
+ fs.create(new Path(src, "file" + iter + ".txt")));
+ futures.add(future);
+ }
+ for (Future future : futures) {
+ future.get();
+ }
+ executorService.shutdown();
+ }
+
+ /**
+ * Tests renaming a directory with a failure during the copy operation.
+ * Simulates an error when copying on the 6th call.
+ */
+ @Test
+ public void testRenameCopyFailureInBetween() throws Exception {
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(
+ createConfig("5", "3", "2")))) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+ fs.getAbfsStore().setClient(client);
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(fs, src, TOTAL_FILES);
+
+ // Track the number of copy operations
+ AtomicInteger copyCall = new AtomicInteger(0);
+ Mockito.doAnswer(copyRequest -> {
+ if (copyCall.get() == FAILED_CALL) {
+ throw new AbfsRestOperationException(
+ BLOB_ALREADY_EXISTS.getStatusCode(),
+ BLOB_ALREADY_EXISTS.getErrorCode(),
+ BLOB_ALREADY_EXISTS.getErrorMessage(),
+ new Exception());
+ }
+ copyCall.incrementAndGet();
+ return copyRequest.callRealMethod();
+ }).when(client).copyBlob(Mockito.any(Path.class),
+ Mockito.any(Path.class), Mockito.nullable(String.class),
+ Mockito.any(TracingContext.class));
+
+ fs.rename(src, dst);
+ // Validate copy operation count
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be less than 10.")
+ .isLessThan(TOTAL_FILES);
+
+ // Validate that rename redo operation was triggered
+ copyCall.set(0);
+
+ // Assertions to validate renamed destination and source
+ validateRename(fs, src, dst, false, true, true);
+
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be greater than 0.")
+ .isGreaterThan(0);
+
+ // Validate final state of destination and source
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Tests renaming a directory with a failure during the delete operation.
+ * Simulates an error on the 6th delete operation and verifies the behavior.
+ */
+ @Test
+ public void testRenameDeleteFailureInBetween() throws Exception {
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(
+ createConfig("5", "3", "2")))) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+ fs.getAbfsStore().setClient(client);
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(fs, src, TOTAL_FILES);
+
+ // Track the number of delete operations
+ AtomicInteger deleteCall = new AtomicInteger(0);
+ Mockito.doAnswer(deleteRequest -> {
+ if (deleteCall.get() == FAILED_CALL) {
+ throw new AbfsRestOperationException(
+ BLOB_PATH_NOT_FOUND.getStatusCode(),
+ BLOB_PATH_NOT_FOUND.getErrorCode(),
+ BLOB_PATH_NOT_FOUND.getErrorMessage(),
+ new Exception());
+ }
+ deleteCall.incrementAndGet();
+ return deleteRequest.callRealMethod();
+ }).when(client).deleteBlobPath(Mockito.any(Path.class),
+ Mockito.anyString(), Mockito.any(TracingContext.class));
+
+ fs.rename(src, dst);
+
+ // Validate delete operation count
+ Assertions.assertThat(deleteCall.get())
+ .describedAs("Delete operation count should be less than 10.")
+ .isLessThan(TOTAL_FILES);
+
+ // Validate that delete redo operation was triggered
+ deleteCall.set(0);
+ // Assertions to validate renamed destination and source
+ validateRename(fs, src, dst, false, true, true);
+
+ Assertions.assertThat(deleteCall.get())
+ .describedAs("Delete operation count should be greater than 0.")
+ .isGreaterThan(0);
+
+ // Validate final state of destination and source
+ // Validate that delete redo operation was triggered
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Tests renaming a file or directory when the destination path contains
+ * a colon (":"). The test ensures that:
+ * - The source directory exists before the rename.
+ * - The file is successfully renamed to the destination path.
+ * - The old source directory no longer exists after the rename.
+ * - The new destination directory exists after the rename.
+ *
+ * @throws Exception if an error occurs during file system operations
+ */
+ @Test
+ public void testRenameWhenDestinationPathContainsColon() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ fs.setWorkingDirectory(new Path(ROOT_PATH));
+ String fileName = "file";
+ Path src = new Path("/test1/");
+ Path dst = new Path("/test1:/");
+
+ // Create the file
+ fs.create(new Path(src, fileName));
+
+ // Perform the rename operation and validate the results
+ performRenameAndValidate(fs, src, dst, fileName);
+ }
+
+ /**
+ * Performs the rename operation and validates the existence of the
directories and files.
+ *
+ * @param fs the AzureBlobFileSystem instance
+ * @param src the source path to be renamed
+ * @param dst the destination path for the rename
+ * @param fileName the name of the file to be renamed
+ */
+ private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path
dst, String fileName)
+ throws IOException {
+ // Assert the source directory exists
+ Assertions.assertThat(fs.exists(src))
+ .describedAs("Old directory should exist before rename")
+ .isTrue();
+
+ // Perform rename
+ fs.rename(src, dst);
+
+ // Assert the destination directory and file exist after rename
+ Assertions.assertThat(fs.exists(new Path(dst, fileName)))
+ .describedAs("Rename should be successful")
+ .isTrue();
+
+ // Assert the source directory no longer exists
+ Assertions.assertThat(fs.exists(src))
+ .describedAs("Old directory should not exist")
+ .isFalse();
+
+ // Assert the new destination directory exists
+ Assertions.assertThat(fs.exists(dst))
+ .describedAs("New directory should exist")
+ .isTrue();
+ }
+
+ /**
+ * Tests the behavior of the atomic rename key for the root folder
+ * in Azure Blob File System. The test verifies that the atomic rename key
+ * returns false for the root folder path.
+ *
+ * @throws Exception if an error occurs during the atomic rename key check
+ */
+ @Test
+ public void testGetAtomicRenameKeyForRootFolder() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ assumeBlobServiceType();
+ AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient();
+ Assertions.assertThat(abfsBlobClient.isAtomicRenameKey("/hbase"))
+ .describedAs("Atomic rename key should return false for Root folder")
+ .isFalse();
+ }
+
+ /**
+ * Tests the behavior of the atomic rename key for non-root folders
+ * in Azure Blob File System. The test verifies that the atomic rename key
+ * works for specific folders as defined in the configuration.
+ * It checks the atomic rename key for various paths,
+ * ensuring it returns true for matching paths and false for others.
+ *
+ * @throws Exception if an error occurs during the atomic rename key check
+ */
+ @Test
+ public void testGetAtomicRenameKeyForNonRootFolder() throws Exception {
+ final AzureBlobFileSystem currentFs = getFileSystem();
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set(FS_AZURE_ATOMIC_RENAME_KEY, "/hbase,/a,/b");
+
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(currentFs.getUri(), config);
+ assumeBlobServiceType();
+ AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient();
+
+ // Test for various paths
+ validateAtomicRenameKey(abfsBlobClient, "/hbase1/test", false);
+ validateAtomicRenameKey(abfsBlobClient, "/hbase/test", true);
+ validateAtomicRenameKey(abfsBlobClient, "/a/b/c", true);
+ validateAtomicRenameKey(abfsBlobClient, "/test/a", false);
+ }
+
+ /**
+ * Validates the atomic rename key for a specific path.
+ *
+ * @param abfsBlobClient the AbfsBlobClient instance
+ * @param path the path to check for atomic rename key
+ * @param expected the expected value (true or false)
+ */
+ private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String
path, boolean expected) {
+ Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path))
+ .describedAs("Atomic rename key check for path: " + path)
+ .isEqualTo(expected);
+ }
+
+ /**
+ * Helper method to create a json file.
+ * @param path parent path
+ * @param renameJson rename json path
+ * @return file system
+ * @throws IOException in case of failure
+ */
+ public AzureBlobFileSystem createJsonFile(Path path, Path renameJson) throws
IOException {
+ final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
+ assumeBlobServiceType();
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(store).when(fs).getAbfsStore();
+ AbfsClient client = Mockito.spy(store.getClient());
+ Mockito.doReturn(client).when(store).getClient();
+
+ fs.setWorkingDirectory(new Path(ROOT_PATH));
+ fs.create(new Path(path, "file.txt"));
+
+ AzureBlobFileSystemStore.VersionedFileStatus fileStatus =
(AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path);
+
+ new RenameAtomicity(path, new Path("/hbase/test4"), renameJson,
getTestTracingContext(fs, true), fileStatus.getEtag(), client)
+ .preRename();
+
+ Assertions.assertThat(fs.exists(renameJson))
+ .describedAs("Rename Pending Json file should exist.")
+ .isTrue();
+
+ return fs;
+ }
+
+ /**
+ * Test case to verify crash recovery with a single child folder.
+ *
+ * This test simulates a scenario where a pending rename JSON file exists
for a single child folder
+ * under the parent directory. It ensures that when listing the files in the
parent directory,
+ * only the child folder (with the pending rename JSON file) is returned,
and no additional files are listed.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void listCrashRecoveryWithSingleChildFolder() throws Exception {
+ AzureBlobFileSystem fs = null;
+ try {
+ Path path = new Path("/hbase/A1/A2");
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs = createJsonFile(path, renameJson);
+
+ FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+ Assertions.assertThat(fileStatuses.length)
+ .describedAs("List should return 1 file")
+ .isEqualTo(1);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * Test case to verify crash recovery with multiple child folders.
+ *
+ * This test simulates a scenario where a pending rename JSON file exists,
and multiple files are
+ * created in the parent directory. It ensures that when listing the files
in the parent directory,
+ * the correct number of files is returned, including the pending rename
JSON file.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void listCrashRecoveryWithMultipleChildFolder() throws Exception {
+ AzureBlobFileSystem fs = null;
+ try {
+ Path path = new Path("/hbase/A1/A2");
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs = createJsonFile(path, renameJson);
+
+ fs.create(new Path("/hbase/A1/file1.txt"));
+ fs.create(new Path("/hbase/A1/file2.txt"));
+
+ FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+ Assertions.assertThat(fileStatuses.length)
+ .describedAs("List should return 3 files")
+ .isEqualTo(3);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * Test case to verify crash recovery with a pending rename JSON file.
+ *
+ * This test simulates a scenario where a pending rename JSON file exists in
the parent directory,
+ * and it ensures that after the deletion of the target directory and
creation of new files,
+ * the listing operation correctly returns the remaining files without
considering the pending rename.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void listCrashRecoveryWithPendingJsonFile() throws Exception {
+ AzureBlobFileSystem fs = null;
+ try {
+ Path path = new Path("/hbase/A1/A2");
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs = createJsonFile(path, renameJson);
+
+ fs.delete(path, true);
+ fs.create(new Path("/hbase/A1/file1.txt"));
+ fs.create(new Path("/hbase/A1/file2.txt"));
+
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+ Assertions.assertThat(fileStatuses.length)
+ .describedAs("List should return 2 files")
+ .isEqualTo(2);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * Test case to verify crash recovery when no pending rename JSON file
exists.
+ *
+ * This test simulates a scenario where there is no pending rename JSON file
in the directory.
+ * It ensures that the listing operation correctly returns all files in the
parent directory, including
+ * those created after the rename JSON file is deleted.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void listCrashRecoveryWithoutAnyPendingJsonFile() throws Exception {
+ AzureBlobFileSystem fs = null;
+ try {
+ Path path = new Path("/hbase/A1/A2");
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs = createJsonFile(path, renameJson);
+
+ fs.delete(renameJson, true);
+ fs.create(new Path("/hbase/A1/file1.txt"));
+ fs.create(new Path("/hbase/A1/file2.txt"));
+
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+ Assertions.assertThat(fileStatuses.length)
+ .describedAs("List should return 3 files")
+ .isEqualTo(3);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * Test case to verify crash recovery when a pending rename JSON directory
exists.
+ *
+ * This test simulates a scenario where a pending rename JSON directory
exists, ensuring that the
+ * listing operation correctly returns all files in the parent directory
without triggering a redo
+ * rename operation. It also checks that the directory with the suffix
"-RenamePending.json" exists.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void listCrashRecoveryWithPendingJsonDir() throws Exception {
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+ Path path = new Path("/hbase/A1/A2");
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs.mkdirs(renameJson);
+
+ fs.create(new Path(path.getParent(), "file1.txt"));
+ fs.create(new Path(path, "file2.txt"));
+
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
+ Mockito.doAnswer(answer -> {
+ redoRenameCall.incrementAndGet();
+ return answer.callRealMethod();
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+ Assertions.assertThat(fileStatuses.length)
+ .describedAs("List should return 3 files")
+ .isEqualTo(3);
+
+ Assertions.assertThat(redoRenameCall.get())
+ .describedAs("No redo rename call should be made")
+ .isEqualTo(0);
+
+ Assertions.assertThat(
+ Arrays.stream(fileStatuses)
+ .anyMatch(status ->
renameJson.toUri().getPath().equals(status.getPath().toUri().getPath())))
+ .describedAs("Directory with suffix -RenamePending.json should
exist.")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Test case to verify crash recovery during listing with multiple pending
rename JSON files.
+ *
+ * This test simulates a scenario where multiple pending rename JSON files
exist, ensuring that
+ * crash recovery properly handles the situation. It verifies that two redo
rename calls are made
+ * and that the list operation returns the correct number of paths.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void listCrashRecoveryWithMultipleJsonFile() throws Exception {
+ AzureBlobFileSystem fs = null;
+ try {
+ Path path = new Path("/hbase/A1/A2");
+
+ // 1st Json file
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs = createJsonFile(path, renameJson);
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+ // 2nd Json file
+ Path path2 = new Path("/hbase/A1/A3");
+ fs.create(new Path(path2, "file3.txt"));
+
+ Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX);
+ AzureBlobFileSystemStore.VersionedFileStatus fileStatus =
(AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path2);
+
+ new RenameAtomicity(path2, new Path("/hbase/test4"), renameJson2,
getTestTracingContext(fs, true), fileStatus.getEtag(), client).preRename();
+
+ fs.create(new Path(path, "file2.txt"));
+
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
+ Mockito.doAnswer(answer -> {
+ redoRenameCall.incrementAndGet();
+ return answer.callRealMethod();
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+ Assertions.assertThat(fileStatuses.length)
+ .describedAs("List should return 2 paths")
+ .isEqualTo(2);
+
+ Assertions.assertThat(redoRenameCall.get())
+ .describedAs("2 redo rename calls should be made")
+ .isEqualTo(2);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * Test case to verify path status when a pending rename JSON file exists.
+ *
+ * This test simulates a scenario where a rename operation was pending, and
ensures that
+ * the path status retrieval triggers a redo rename operation. The test also
checks that
+ * the correct error code (`PATH_NOT_FOUND`) is returned.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void getPathStatusWithPendingJsonFile() throws Exception {
+ AzureBlobFileSystem fs = null;
+ try {
+ Path path = new Path("/hbase/A1/A2");
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs = createJsonFile(path, renameJson);
+
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+ fs.create(new Path("/hbase/A1/file1.txt"));
+ fs.create(new Path("/hbase/A1/file2.txt"));
+
+ AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
+
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
+ Mockito.doAnswer(answer -> {
+ redoRenameCall.incrementAndGet();
+ return answer.callRealMethod();
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+ TracingContext tracingContext = new TracingContext(
+ conf.getClientCorrelationId(), fs.getFileSystemId(),
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
null);
+
+ AzureServiceErrorCode azureServiceErrorCode = intercept(
+ AbfsRestOperationException.class, () -> client.getPathStatus(
+ path.toUri().getPath(), true,
+ tracingContext, null)).getErrorCode();
+
+ Assertions.assertThat(azureServiceErrorCode.getErrorCode())
+ .describedAs("Path had to be recovered from atomic rename
operation.")
+ .isEqualTo(PATH_NOT_FOUND.getErrorCode());
+
+ Assertions.assertThat(redoRenameCall.get())
+ .describedAs("There should be one redo rename call")
+ .isEqualTo(1);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * Test case to verify path status when there is no pending rename JSON file.
+ *
+ * This test ensures that when no rename pending JSON file is present, the
path status is
+ * successfully retrieved, the ETag is present, and no redo rename operation
is triggered.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void getPathStatusWithoutPendingJsonFile() throws Exception {
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
+ assumeBlobServiceType();
+
+ Path path = new Path("/hbase/A1/A2");
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+ fs.create(new Path(path, "file1.txt"));
+ fs.create(new Path(path, "file2.txt"));
+
+ AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
+
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
+ Mockito.doAnswer(answer -> {
+ redoRenameCall.incrementAndGet();
+ return answer.callRealMethod();
+ }).when(client).getRedoRenameAtomicity(
+ Mockito.any(Path.class), Mockito.anyInt(),
+ Mockito.any(TracingContext.class));
+
+ TracingContext tracingContext = new TracingContext(
+ conf.getClientCorrelationId(), fs.getFileSystemId(),
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
+ null);
+
+ AbfsHttpOperation abfsHttpOperation = client.getPathStatus(
+ path.toUri().getPath(), true,
+ tracingContext, null).getResult();
+
+ Assertions.assertThat(abfsHttpOperation.getStatusCode())
+ .describedAs("Path should be found.")
+ .isEqualTo(HTTP_OK);
+
+ Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
+ .describedAs("Etag should be present.")
+ .isNotNull();
+
+ Assertions.assertThat(redoRenameCall.get())
+ .describedAs("There should be no redo rename call.")
+ .isEqualTo(0);
+ }
+ }
+
+ /**
+ * Test case to verify path status when there is a pending rename JSON
directory.
+ *
+ * This test simulates the scenario where a directory is created with a
rename pending JSON
+ * file (indicated by a specific suffix). It ensures that the path is found,
the ETag is present,
+ * and no redo rename operation is triggered. It also verifies that the
rename pending directory
+ * exists.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void getPathStatusWithPendingJsonDir() throws Exception {
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
+ assumeBlobServiceType();
+
+ Path path = new Path("/hbase/A1/A2");
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+ fs.create(new Path(path, "file1.txt"));
+ fs.create(new Path(path, "file2.txt"));
+
+ fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX));
+
+ AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
+
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
+ Mockito.doAnswer(answer -> {
+ redoRenameCall.incrementAndGet();
+ return answer.callRealMethod();
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+ TracingContext tracingContext = new TracingContext(
+ conf.getClientCorrelationId(), fs.getFileSystemId(),
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
null);
+
+ AbfsHttpOperation abfsHttpOperation =
client.getPathStatus(path.toUri().getPath(), true, tracingContext,
null).getResult();
+
+ Assertions.assertThat(abfsHttpOperation.getStatusCode())
+ .describedAs("Path should be found.")
+ .isEqualTo(HTTP_OK);
+
+ Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
+ .describedAs("Etag should be present.")
+ .isNotNull();
+
+ Assertions.assertThat(redoRenameCall.get())
+ .describedAs("There should be no redo rename call.")
+ .isEqualTo(0);
+
+ Assertions.assertThat(fs.exists(new Path(path.getParent(),
path.getName() + SUFFIX)))
+ .describedAs("Directory with suffix -RenamePending.json should
exist.")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Test case to verify the behavior when the ETag of a file changes during a
rename operation.
+ *
+ * This test simulates a scenario where the ETag of a file changes after the
creation of a
+ * rename pending JSON file. The steps include:
+ * - Creating a rename pending JSON file with an old ETag.
+ * - Deleting the original directory for an ETag change.
+ * - Creating new files in the directory.
+ * - Verifying that the copy blob call is not triggered.
+ * - Verifying that the rename atomicity operation is called once.
+ *
+ * The test ensures that the system correctly handles the ETag change during
the rename process.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void eTagChangedDuringRename() throws Exception {
Review Comment:
Added test wherever required.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java:
##########
@@ -1655,6 +1730,827 @@ public void
testRenameSrcDirDeleteEmitDeletionCountInClientRequestId()
fs.rename(new Path(dirPathStr), new Path("/dst/"));
}
+ /**
+ * Helper method to configure the AzureBlobFileSystem and rename directories.
+ *
+ * @param currentFs The current AzureBlobFileSystem to use for renaming.
+ * @param producerQueueSize Maximum size of the producer queue.
+ * @param consumerMaxLag Maximum lag allowed for the consumer.
+ * @param maxThread Maximum threads for the rename operation.
+ * @param src The source path of the directory to rename.
+ * @param dst The destination path of the renamed directory.
+ * @throws IOException If an I/O error occurs during the operation.
+ */
+ private void renameDir(AzureBlobFileSystem currentFs, String
producerQueueSize,
+ String consumerMaxLag, String maxThread, Path src, Path dst)
+ throws IOException {
+ Configuration config = createConfig(producerQueueSize, consumerMaxLag,
maxThread);
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(currentFs.getUri(), config)) {
+ fs.rename(src, dst);
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Helper method to create the configuration for the AzureBlobFileSystem.
+ *
+ * @param producerQueueSize Maximum size of the producer queue.
+ * @param consumerMaxLag Maximum lag allowed for the consumer.
+ * @param maxThread Maximum threads for the rename operation.
+ * @return The configuration object.
+ */
+ private Configuration createConfig(String producerQueueSize, String
consumerMaxLag, String maxThread) {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize);
+ config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag);
+ config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread);
+ return config;
+ }
+
+ /**
+ * Helper method to validate that the rename was successful and that the
destination exists.
+ *
+ * @param fs The AzureBlobFileSystem instance to check the existence on.
+ * @param dst The destination path.
+ * @param src The source path.
+ * @throws IOException If an I/O error occurs during the validation.
+ */
+ private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+ boolean isSrcExist, boolean isDstExist, boolean isJsonExist)
+ throws IOException {
+ Assertions.assertThat(fs.exists(dst))
+ .describedAs("Renamed Destination directory should exist.")
+ .isEqualTo(isDstExist);
+ Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() +
SUFFIX)))
+ .describedAs("Renamed Pending Json file should exist.")
+ .isEqualTo(isJsonExist);
+ Assertions.assertThat(fs.exists(src))
+ .describedAs("Renamed Destination directory should exist.")
+ .isEqualTo(isSrcExist);
+ }
+
+ /**
+ * Test the renaming of a directory with different parallelism
configurations.
+ */
+ @Test
+ public void testRenameDirWithDifferentParallelism() throws Exception {
Review Comment:
Taken!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]