This is an automated email from the ASF dual-hosted git repository. anujmodi pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 64ea869f050 HADOOP-19522: ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists with Destination Directory (#7559) (#7633) 64ea869f050 is described below commit 64ea869f050fa9bc631120c6345a322ae04dd13c Author: Manish Bhatt <52626736+bhattmanis...@users.noreply.github.com> AuthorDate: Fri Apr 18 09:01:36 2025 -0700 HADOOP-19522: ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists with Destination Directory (#7559) (#7633) Contributed by Manish Bhatt Reviewed by Anmol Asrani, Manika Joshi, Anuj Modi Signed off by Anuj Modi<anujm...@apache.org> --- .../hadoop/fs/azurebfs/AbfsCountersImpl.java | 4 +- .../apache/hadoop/fs/azurebfs/AbfsStatistic.java | 4 +- .../fs/azurebfs/services/AbfsBlobClient.java | 21 +- .../hadoop/fs/azurebfs/services/AbfsClient.java | 8 +- .../fs/azurebfs/services/BlobRenameHandler.java | 30 +- .../fs/azurebfs/services/RenameAtomicity.java | 2 +- .../azurebfs/ITestAzureBlobFileSystemRename.java | 1639 ++++++++------------ .../ITestAzureBlobFileSystemRenameRecovery.java | 770 +++++++++ .../fs/azurebfs/services/AbfsClientTestUtil.java | 2 +- .../hadoop/fs/azurebfs/utils/AbfsTestUtils.java | 37 + 10 files changed, 1487 insertions(+), 1030 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java index fdcbc2275ff..7a941171fa4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableMetric; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND; @@ -134,7 +135,8 @@ public class AbfsCountersImpl implements AbfsCounters { SERVER_UNAVAILABLE, RENAME_RECOVERY, METADATA_INCOMPLETE_RENAME_FAILURES, - RENAME_PATH_ATTEMPTS + RENAME_PATH_ATTEMPTS, + ATOMIC_RENAME_PATH_ATTEMPTS }; private static final AbfsStatistic[] DURATION_TRACKER_LIST = { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java index 3a77e82ffb4..eba46357335 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java @@ -109,7 +109,9 @@ public enum AbfsStatistic { "Number of times rename operation failed due to metadata being " + "incomplete"), RENAME_PATH_ATTEMPTS("rename_path_attempts", - "Number of times we attempt to rename a path internally"); + "Number of times we attempt to rename a path internally"), + ATOMIC_RENAME_PATH_ATTEMPTS("atomic_rename_path_attempts", + "Number of times atomic rename attempted"); private String statName; private String statDescription; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 229bf75fc3a..dea41ad38bd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -385,8 +385,20 @@ public ListResponseData listPath(final String relativePath, final boolean recurs if (tracingContext.getOpType() == FSOperationType.LISTSTATUS && op.getResult() != null && op.getResult().getStatusCode() == HTTP_OK) { - retryRenameOnAtomicEntriesInListResults(tracingContext, + boolean isRenameRecovered = retryRenameOnAtomicEntriesInListResults(tracingContext, listResponseData.getRenamePendingJsonPaths()); + if (isRenameRecovered) { + LOG.debug("Retrying list operation after rename recovery."); + // Retry the list operation to get the updated list of paths after rename recovery. + AbfsRestOperation retryListOp = getAbfsRestOperation( + AbfsRestOperationType.ListBlobs, + HTTP_METHOD_GET, + url, + requestHeaders); + retryListOp.execute(tracingContext); + listResponseData = parseListPathResults(retryListOp.getResult(), uri); + listResponseData.setOp(retryListOp); + } } if (isEmptyListResults(listResponseData) && is404CheckRequired) { @@ -425,15 +437,16 @@ public ListResponseData listPath(final String relativePath, final boolean recurs * @param tracingContext tracing context * @throws AzureBlobFileSystemException if rest operation or response parsing fails. */ - private void retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext, + private boolean retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext, Map<Path, Integer> renamePendingJsonPaths) throws AzureBlobFileSystemException { if (renamePendingJsonPaths == null || renamePendingJsonPaths.isEmpty()) { - return; + return false; } for (Map.Entry<Path, Integer> entry : renamePendingJsonPaths.entrySet()) { retryRenameOnAtomicKeyPath(entry.getKey(), entry.getValue(), tracingContext); } + return true; } /**{@inheritDoc}*/ @@ -813,7 +826,7 @@ public AbfsClientRenameResult renamePath(final String source, destination, sourceEtag, isAtomicRenameKey(source), tracingContext ); try { - if (blobRenameHandler.execute()) { + if (blobRenameHandler.execute(false)) { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); final URL url = createRequestUrl(destination, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index c198a423dcb..3e9d9fed8f2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -83,6 +83,7 @@ import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.store.LogExactlyOnce; @@ -1540,8 +1541,11 @@ public void getMetricCall(TracingContext tracingContext) throws IOException { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - final URL url = createRequestUrl(new URL(abfsMetricUrl), EMPTY_STRING, abfsUriQueryBuilder.toString()); - + // Construct the URL for the metric call + // In case of blob storage, the URL is changed to DFS URL + final URL url = UriUtils.changeUrlFromBlobToDfs( + createRequestUrl(new URL(abfsMetricUrl), + EMPTY_STRING, abfsUriQueryBuilder.toString())); final AbfsRestOperation op = getAbfsRestOperation( AbfsRestOperationType.GetFileSystemProperties, HTTP_METHOD_HEAD, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java index f78228bfcff..1f22d049ece 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; @@ -115,14 +116,15 @@ int getMaxConsumptionParallelism() { /** * Orchestrates the rename operation. + * @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation * * @return AbfsClientRenameResult containing the result of the rename operation * @throws AzureBlobFileSystemException if server call fails */ - public boolean execute() throws AzureBlobFileSystemException { + public boolean execute(final boolean isRenameRecovery) throws AzureBlobFileSystemException { PathInformation pathInformation = getPathInformation(src, tracingContext); boolean result = false; - if (preCheck(src, dst, pathInformation)) { + if (preCheck(src, dst, pathInformation, isRenameRecovery)) { RenameAtomicity renameAtomicity = null; if (pathInformation.getIsDirectory() && pathInformation.getIsImplicit()) { @@ -147,6 +149,8 @@ public boolean execute() throws AzureBlobFileSystemException { * recovers the lease on a log file, to gain exclusive access to it, before * it splits it. */ + getAbfsClient().getAbfsCounters() + .incrementCounter(AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS, 1); if (srcAbfsLease == null) { srcAbfsLease = takeLease(src, srcEtag); } @@ -192,6 +196,13 @@ private boolean finalSrcRename() throws AzureBlobFileSystemException { tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1); try { return renameInternal(src, dst); + } catch(AbfsRestOperationException e) { + if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + // If the destination path already exists, then delete the source path. + getAbfsClient().deleteBlobPath(src, null, tracingContext); + return true; + } + throw e; } finally { tracingContext.setOperatedBlobCount(null); } @@ -249,16 +260,17 @@ private boolean containsColon(Path p) { * @param src source path * @param dst destination path * @param pathInformation object in which path information of the source path would be stored + * @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation * * @return true if the pre-checks pass * @throws AzureBlobFileSystemException if server call fails or given paths are invalid. */ private boolean preCheck(final Path src, final Path dst, - final PathInformation pathInformation) + final PathInformation pathInformation, final boolean isRenameRecovery) throws AzureBlobFileSystemException { validateDestinationIsNotSubDir(src, dst); validateSourcePath(pathInformation); - validateDestinationPathNotExist(src, dst, pathInformation); + validateDestinationPathNotExist(src, dst, pathInformation, isRenameRecovery); validateDestinationParentExist(src, dst, pathInformation); return true; @@ -319,13 +331,13 @@ private void validateSourcePath(final PathInformation pathInformation) * @param src source path * @param dst destination path * @param pathInformation object containing the path information of the source path + * @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation * * @throws AbfsRestOperationException if the destination path already exists */ private void validateDestinationPathNotExist(final Path src, - final Path dst, - final PathInformation pathInformation) - throws AzureBlobFileSystemException { + final Path dst, final PathInformation pathInformation, + final boolean isRenameRecovery) throws AzureBlobFileSystemException { /* * Destination path name can be same to that of source path name only in the * case of a directory rename. @@ -333,8 +345,8 @@ private void validateDestinationPathNotExist(final Path src, * In case the directory is being renamed to some other name, the destination * check would happen on the AzureBlobFileSystem#rename method. */ - if (pathInformation.getIsDirectory() && dst.getName() - .equals(src.getName())) { + if (!isRenameRecovery && pathInformation.getIsDirectory() + && dst.getName().equals(src.getName())) { PathInformation dstPathInformation = getPathInformation( dst, tracingContext); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java index 42e8f6ed3aa..860d9eb5277 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java @@ -152,7 +152,7 @@ public void redo() throws AzureBlobFileSystemException { abfsClient, srcEtag, true, true, tracingContext); - blobRenameHandler.execute(); + blobRenameHandler.execute(true); } } finally { deleteRenamePendingJson(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index e2f8b679fc5..408f94d78d8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -24,10 +24,8 @@ import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -51,7 +49,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; @@ -94,17 +91,15 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; -import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; -import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED; -import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY; import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist; @@ -126,10 +121,6 @@ public class ITestAzureBlobFileSystemRename extends private static final int TOTAL_FILES = 25; - private static final int TOTAL_THREADS_IN_POOL = 5; - - private static final int FAILED_CALL = 15; - public ITestAzureBlobFileSystemRename() throws Exception { super(); } @@ -527,55 +518,6 @@ public void testRenamePendingJsonIsRemovedPostSuccessfulRename() (int) correctDeletePathCount[0]); } - /** - * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification - * of client interactions in tests. It replaces the actual store and client with mocked versions. - * - * @param fs the AzureBlobFileSystem instance - * @return the spied AbfsClient for interaction verification - */ - private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - Mockito.doReturn(store).when(fs).getAbfsStore(); - AbfsClient client = Mockito.spy(store.getClient()); - Mockito.doReturn(client).when(store).getClient(); - return client; - } - - /** - * A helper method to set up the test environment and execute the common logic for handling - * failed rename operations and recovery in HBase. This method performs the necessary setup - * (creating directories and files) and then triggers the `crashRenameAndRecover` method - * with a provided recovery action. - * - * This method is used by different tests that require different recovery actions, such as - * performing `listStatus` or checking the existence of a path after a failed rename. - * - * @param fs the AzureBlobFileSystem instance to be used in the test - * @param client the AbfsBlobClient instance to be used in the test - * @param srcPath the source path for the rename operation - * @param failedCopyPath the path that simulates a failed copy during rename - * @param recoveryAction the specific recovery action to be performed after the rename failure - * (e.g., listing directory status or checking path existence) - * @throws Exception if any error occurs during setup or execution of the recovery action - */ - private void setupAndTestHBaseFailedRenameRecovery( - final AzureBlobFileSystem fs, - final AbfsBlobClient client, - final String srcPath, - final String failedCopyPath, - final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryAction) - throws Exception { - fs.setWorkingDirectory(new Path("/")); - fs.mkdirs(new Path(srcPath)); - fs.mkdirs(new Path(srcPath, "test3")); - fs.create(new Path(srcPath + "/test3/file")); - fs.create(new Path(failedCopyPath)); - fs.mkdirs(new Path("hbase/test4/")); - fs.create(new Path("hbase/test4/file1")); - crashRenameAndRecover(fs, client, srcPath, recoveryAction); - } - /** * Test for a directory in /hbase directory. To simulate the crash of process, * test will throw an exception with 403 on a copy of one of the blob.<br> @@ -622,87 +564,6 @@ public void testHBaseHandlingForFailedRenameWithGetFileStatusRecovery() }); } - - /** - * Simulates a rename failure, performs a recovery action, and verifies that the "RenamePendingJson" - * file is deleted. It checks that the rename operation is successfully completed after recovery. - * - * @param fs the AzureBlobFileSystem instance - * @param client the AbfsBlobClient instance - * @param srcPath the source path for the rename operation - * @param recoveryCallable the recovery action to perform - * @throws Exception if an error occurs during recovery or verification - */ - private void crashRenameAndRecover(final AzureBlobFileSystem fs, - AbfsBlobClient client, - final String srcPath, - final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryCallable) - throws Exception { - crashRename(fs, client, srcPath); - AzureBlobFileSystem fs2 = Mockito.spy(getFileSystem()); - fs2.setWorkingDirectory(new Path(ROOT_PATH)); - client = (AbfsBlobClient) addSpyHooksOnClient(fs2); - int[] renameJsonDeleteCounter = new int[1]; - Mockito.doAnswer(answer -> { - if ((ROOT_PATH + srcPath + SUFFIX) - .equalsIgnoreCase(((Path) answer.getArgument(0)).toUri().getPath())) { - renameJsonDeleteCounter[0] = 1; - } - return answer.callRealMethod(); - }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - recoveryCallable.apply(fs2); - Assertions.assertThat(renameJsonDeleteCounter[0]) - .describedAs("RenamePendingJson should be deleted") - .isEqualTo(1); - //List would complete the rename orchestration. - assertFalse(fs2.exists(new Path("hbase/test1/test2"))); - assertFalse(fs2.exists(new Path("hbase/test1/test2/test3"))); - assertTrue(fs2.exists(new Path("hbase/test4/test2/test3"))); - assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file"))); - assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file"))); - assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file1"))); - assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file1"))); - } - - /** - * Simulates a rename failure by triggering an `AbfsRestOperationException` during the rename process. - * It intercepts the exception and ensures that all leases acquired during the atomic rename are released. - * - * @param fs the AzureBlobFileSystem instance used for the rename operation - * @param client the AbfsBlobClient instance used for mocking the rename failure - * @param srcPath the source path for the rename operation - * @throws Exception if an error occurs during the simulated failure or lease release - */ - private void crashRename(final AzureBlobFileSystem fs, - final AbfsBlobClient client, - final String srcPath) throws Exception { - BlobRenameHandler[] blobRenameHandlers = new BlobRenameHandler[1]; - AbfsClientTestUtil.mockGetRenameBlobHandler(client, - blobRenameHandler -> { - blobRenameHandlers[0] = blobRenameHandler; - return null; - }); - //Fail rename orchestration on path hbase/test1/test2/test3/file1 - Mockito.doThrow(new AbfsRestOperationException(HTTP_FORBIDDEN, "", "", - new Exception())) - .when(client) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - LambdaTestUtils.intercept(AccessDeniedException.class, () -> { - fs.rename(new Path(srcPath), - new Path("hbase/test4")); - }); - //Release all the leases taken by atomic rename orchestration - List<AbfsLease> leases = new ArrayList<>(blobRenameHandlers[0].getLeases()); - for (AbfsLease lease : leases) { - lease.free(); - } - } - /** * Simulates a scenario where HMaster in Hbase starts up and executes listStatus * API on the directory that has to be renamed by some other executor-machine. @@ -721,93 +582,6 @@ public void testHbaseListStatusBeforeRenamePendingFileAppendedWithIngressOnBlob( testAtomicityRedoInvalidFile(fs); } - /** - * Tests renaming a directory in AzureBlobFileSystem when the creation of the "RenamePendingJson" - * file fails on the first attempt. It ensures the renaming operation is retried. - * - * The test verifies that the creation of the "RenamePendingJson" file is attempted twice: - * once on failure and once on retry. - * - * @param fs the AzureBlobFileSystem instance for the test - * @throws Exception if an error occurs during the test - */ - private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs) - throws Exception { - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - Path src = new Path("hbase/test1/test2"); - Path dest = new Path("hbase/test4"); - fs.mkdirs(src); - fs.mkdirs(new Path(src, "test3")); - final int[] renamePendingJsonWriteCounter = new int[1]; - /* - * Fail the creation of RenamePendingJson file on the first attempt. - */ - Answer renamePendingJsonCreateAns = createAnswer -> { - Path path = createAnswer.getArgument(0); - Mockito.doAnswer(clientFlushAns -> { - if (renamePendingJsonWriteCounter[0]++ == 0) { - fs.delete(path, true); - } - return clientFlushAns.callRealMethod(); - }) - .when(client) - .flush(Mockito.any(byte[].class), Mockito.anyString(), - Mockito.anyBoolean(), Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.anyString(), - Mockito.nullable(ContextEncryptionAdapter.class), - Mockito.any(TracingContext.class)); - return createAnswer.callRealMethod(); - }; - RenameAtomicityTestUtils.addCreatePathMock(client, - renamePendingJsonCreateAns); - fs.rename(src, dest); - Assertions.assertThat(renamePendingJsonWriteCounter[0]) - .describedAs("Creation of RenamePendingJson should be attempted twice") - .isEqualTo(2); - } - - /** - * Tests the behavior of the redo operation when an invalid "RenamePendingJson" file exists. - * It verifies that the file is deleted and that no copy operation is performed. - * - * The test simulates a scenario where the "RenamePendingJson" file is partially written and - * ensures that the `redo` method correctly deletes the file and does not trigger a copy operation. - * - * @param fs the AzureBlobFileSystem instance for the test - * @throws Exception if an error occurs during the test - */ - private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs) - throws Exception { - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - Path path = new Path("/hbase/test1/test2"); - fs.mkdirs(new Path(path, "test3")); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - OutputStream os = fs.create(renameJson); - os.write("{".getBytes(StandardCharsets.UTF_8)); - os.close(); - int[] renameJsonDeleteCounter = new int[1]; - Mockito.doAnswer(deleteAnswer -> { - Path ansPath = deleteAnswer.getArgument(0); - if (renameJson.toUri() - .getPath() - .equalsIgnoreCase(ansPath.toUri().getPath())) { - renameJsonDeleteCounter[0]++; - } - return deleteAnswer.callRealMethod(); - }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - new RenameAtomicity(renameJson, 1, - getTestTracingContext(fs, true), null, client).redo(); - Assertions.assertThat(renameJsonDeleteCounter[0]) - .describedAs("RenamePendingJson should be deleted") - .isEqualTo(1); - Mockito.verify(client, Mockito.times(0)).copyBlob(Mockito.any(Path.class), - Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - } - /** * Test to check the atomicity of rename operation. The rename operation should * be atomic and should not leave any intermediate state. @@ -1031,31 +805,6 @@ public void testBlobRenameSrcDirHasNoMarker() throws Exception { assertTrue(fs.exists(new Path("/test2/test1"))); } - /** - * Mocks the progress status for a copy blob operation. - * This method simulates a copy operation that is pending and not yet completed. - * It intercepts the `copyBlob` method and modifies its response to return a "COPY_STATUS_PENDING" - * status for the copy operation. - * - * @param spiedClient The {@link AbfsBlobClient} instance that is being spied on. - * @throws AzureBlobFileSystemException if the mock setup fails. - */ - private void addMockForProgressStatusOnCopyOperation(final AbfsBlobClient spiedClient) - throws AzureBlobFileSystemException { - Mockito.doAnswer(answer -> { - AbfsRestOperation op = Mockito.spy( - (AbfsRestOperation) answer.callRealMethod()); - AbfsHttpOperation httpOp = Mockito.spy(op.getResult()); - Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader( - HttpHeaderConfigurations.X_MS_COPY_STATUS); - Mockito.doReturn(httpOp).when(op).getResult(); - return op; - }) - .when(spiedClient) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); - } - /** * Verifies the behavior of a blob copy operation that takes time to complete. * The test ensures the following: @@ -1091,54 +840,14 @@ public void testCopyBlobTakeTime() throws Exception { } /** - * Mocks the final status of a blob copy operation. - * This method ensures that when checking the status of a copy operation in progress, - * it returns the specified final status (e.g., success, failure, aborted). - * - * @param spiedClient The mocked Azure Blob client to apply the mock behavior. - * @param requiredCopyFinalStatus The final status of the copy operation to be returned - * (e.g., COPY_STATUS_FAILED, COPY_STATUS_ABORTED). - */ - private void addMockForCopyOperationFinalStatus(final AbfsBlobClient spiedClient, - final String requiredCopyFinalStatus) { - AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient, - blobRenameHandler -> { - Mockito.doAnswer(onHandleCopyInProgress -> { - Path handlePath = onHandleCopyInProgress.getArgument(0); - TracingContext tracingContext = onHandleCopyInProgress.getArgument( - 1); - Mockito.doAnswer(onStatusCheck -> { - AbfsRestOperation op = Mockito.spy( - (AbfsRestOperation) onStatusCheck.callRealMethod()); - AbfsHttpOperation httpOp = Mockito.spy(op.getResult()); - Mockito.doReturn(requiredCopyFinalStatus) - .when(httpOp) - .getResponseHeader( - HttpHeaderConfigurations.X_MS_COPY_STATUS); - Mockito.doReturn(httpOp).when(op).getResult(); - return op; - }) - .when(spiedClient) - .getPathStatus(handlePath.toUri().getPath(), - tracingContext, null, false); - return onHandleCopyInProgress.callRealMethod(); - }) - .when(blobRenameHandler) - .handleCopyInProgress(Mockito.any(Path.class), - Mockito.any(TracingContext.class), Mockito.any(String.class)); - return null; - }); - } - - /** - * Verifies the behavior when a blob copy operation takes time and eventually fails. - * The test ensures the following: - * <ul> - * <li>A file is created and a copy operation is initiated.</li> - * <li>The copy operation is mocked to eventually fail.</li> - * <li>The rename operation triggers an exception due to the failed copy.</li> - * <li>The test checks that the appropriate 'COPY_FAILED' error code and status code are returned.</li> - * </ul> + * Verifies the behavior when a blob copy operation takes time and eventually fails. + * The test ensures the following: + * <ul> + * <li>A file is created and a copy operation is initiated.</li> + * <li>The copy operation is mocked to eventually fail.</li> + * <li>The rename operation triggers an exception due to the failed copy.</li> + * <li>The test checks that the appropriate 'COPY_FAILED' error code and status code are returned.</li> + * </ul> * * @throws Exception if an error occurs during the test execution */ @@ -1734,65 +1443,6 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() fs.rename(new Path(dirPathStr), new Path("/dst/")); } - /** - * Helper method to configure the AzureBlobFileSystem and rename directories. - * - * @param currentFs The current AzureBlobFileSystem to use for renaming. - * @param producerQueueSize Maximum size of the producer queue. - * @param consumerMaxLag Maximum lag allowed for the consumer. - * @param maxThread Maximum threads for the rename operation. - * @param src The source path of the directory to rename. - * @param dst The destination path of the renamed directory. - * @throws IOException If an I/O error occurs during the operation. - */ - private void renameDir(AzureBlobFileSystem currentFs, String producerQueueSize, - String consumerMaxLag, String maxThread, Path src, Path dst) - throws IOException { - Configuration config = createConfig(producerQueueSize, consumerMaxLag, maxThread); - try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config)) { - fs.rename(src, dst); - validateRename(fs, src, dst, false, true, false); - } - } - - /** - * Helper method to create the configuration for the AzureBlobFileSystem. - * - * @param producerQueueSize Maximum size of the producer queue. - * @param consumerMaxLag Maximum lag allowed for the consumer. - * @param maxThread Maximum threads for the rename operation. - * @return The configuration object. - */ - private Configuration createConfig(String producerQueueSize, String consumerMaxLag, String maxThread) { - Configuration config = new Configuration(this.getRawConfiguration()); - config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize); - config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag); - config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread); - return config; - } - - /** - * Helper method to validate that the rename was successful and that the destination exists. - * - * @param fs The AzureBlobFileSystem instance to check the existence on. - * @param dst The destination path. - * @param src The source path. - * @throws IOException If an I/O error occurs during the validation. - */ - private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, - boolean isSrcExist, boolean isDstExist, boolean isJsonExist) - throws IOException { - Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() + SUFFIX))) - .describedAs("Renamed Pending Json file should exist.") - .isEqualTo(isJsonExist); - Assertions.assertThat(fs.exists(src)) - .describedAs("Renamed Source directory should not exist.") - .isEqualTo(isSrcExist); - Assertions.assertThat(fs.exists(dst)) - .describedAs("Renamed Destination directory should exist.") - .isEqualTo(isDstExist); - } - /** * Test the renaming of a directory with different parallelism configurations. */ @@ -1804,7 +1454,7 @@ public void testRenameDirWithDifferentParallelismConfig() throws Exception { Path dst = new Path("/hbase/A1/A3"); // Create sample files in the source directory - createFiles(currentFs, src, TOTAL_FILES); + createFiles(currentFs, src, TOTAL_FILES); // Test renaming with different configurations renameDir(currentFs, "10", "5", "2", src, dst); @@ -1823,138 +1473,6 @@ public void testRenameDirWithDifferentParallelismConfig() throws Exception { } } - /** - * Helper method to create files in the given directory. - * - * @param fs The AzureBlobFileSystem instance to use for file creation. - * @param src The source path (directory). - * @param numFiles The number of files to create. - * @throws ExecutionException, InterruptedException If an error occurs during file creation. - */ - private void createFiles(AzureBlobFileSystem fs, Path src, int numFiles) - throws ExecutionException, InterruptedException { - ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL); - List<Future> futures = new ArrayList<>(); - for (int i = 0; i < numFiles; i++) { - final int iter = i; - Future future = executorService.submit(() -> - fs.create(new Path(src, "file" + iter + ".txt"))); - futures.add(future); - } - for (Future future : futures) { - future.get(); - } - executorService.shutdown(); - } - - /** - * Tests renaming a directory with a failure during the copy operation. - * Simulates an error when copying on the 6th call. - */ - @Test - public void testRenameCopyFailureInBetween() throws Exception { - try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem( - createConfig("5", "3", "2")))) { - assumeBlobServiceType(); - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - fs.getAbfsStore().setClient(client); - Path src = new Path("/hbase/A1/A2"); - Path dst = new Path("/hbase/A1/A3"); - - // Create sample files in the source directory - createFiles(fs, src, TOTAL_FILES); - - // Track the number of copy operations - AtomicInteger copyCall = new AtomicInteger(0); - Mockito.doAnswer(copyRequest -> { - if (copyCall.get() == FAILED_CALL) { - throw new AbfsRestOperationException( - BLOB_ALREADY_EXISTS.getStatusCode(), - BLOB_ALREADY_EXISTS.getErrorCode(), - BLOB_ALREADY_EXISTS.getErrorMessage(), - new Exception()); - } - copyCall.incrementAndGet(); - return copyRequest.callRealMethod(); - }).when(client).copyBlob(Mockito.any(Path.class), - Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - - fs.rename(src, dst); - // Validate copy operation count - Assertions.assertThat(copyCall.get()) - .describedAs("Copy operation count should be less than 10.") - .isLessThan(TOTAL_FILES); - - // Validate that rename redo operation was triggered - copyCall.set(0); - - // Assertions to validate renamed destination and source - validateRename(fs, src, dst, false, true, true); - - Assertions.assertThat(copyCall.get()) - .describedAs("Copy operation count should be greater than 0.") - .isGreaterThan(0); - - // Validate final state of destination and source - validateRename(fs, src, dst, false, true, false); - } - } - - /** - * Tests renaming a directory with a failure during the delete operation. - * Simulates an error on the 6th delete operation and verifies the behavior. - */ - @Test - public void testRenameDeleteFailureInBetween() throws Exception { - try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem( - createConfig("5", "3", "2")))) { - assumeBlobServiceType(); - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - fs.getAbfsStore().setClient(client); - Path src = new Path("/hbase/A1/A2"); - Path dst = new Path("/hbase/A1/A3"); - - // Create sample files in the source directory - createFiles(fs, src, TOTAL_FILES); - - // Track the number of delete operations - AtomicInteger deleteCall = new AtomicInteger(0); - Mockito.doAnswer(deleteRequest -> { - if (deleteCall.get() == FAILED_CALL) { - throw new AbfsRestOperationException( - BLOB_PATH_NOT_FOUND.getStatusCode(), - BLOB_PATH_NOT_FOUND.getErrorCode(), - BLOB_PATH_NOT_FOUND.getErrorMessage(), - new Exception()); - } - deleteCall.incrementAndGet(); - return deleteRequest.callRealMethod(); - }).when(client).deleteBlobPath(Mockito.any(Path.class), - Mockito.anyString(), Mockito.any(TracingContext.class)); - - fs.rename(src, dst); - - // Validate delete operation count - Assertions.assertThat(deleteCall.get()) - .describedAs("Delete operation count should be less than 10.") - .isLessThan(TOTAL_FILES); - - // Validate that delete redo operation was triggered - deleteCall.set(0); - // Assertions to validate renamed destination and source - validateRename(fs, src, dst, false, true, true); - - Assertions.assertThat(deleteCall.get()) - .describedAs("Delete operation count should be greater than 0.") - .isGreaterThan(0); - - // Validate final state of destination and source - // Validate that delete redo operation was triggered - validateRename(fs, src, dst, false, true, false); - } - } - /** * Tests renaming a file or directory when the destination path contains * a colon (":"). The test ensures that: @@ -1980,40 +1498,6 @@ public void testRenameWhenDestinationPathContainsColon() throws Exception { performRenameAndValidate(fs, src, dst, fileName); } - /** - * Performs the rename operation and validates the existence of the directories and files. - * - * @param fs the AzureBlobFileSystem instance - * @param src the source path to be renamed - * @param dst the destination path for the rename - * @param fileName the name of the file to be renamed - */ - private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path dst, String fileName) - throws IOException { - // Assert the source directory exists - Assertions.assertThat(fs.exists(src)) - .describedAs("Old directory should exist before rename") - .isTrue(); - - // Perform rename - fs.rename(src, dst); - - // Assert the destination directory and file exist after rename - Assertions.assertThat(fs.exists(new Path(dst, fileName))) - .describedAs("Rename should be successful") - .isTrue(); - - // Assert the source directory no longer exists - Assertions.assertThat(fs.exists(src)) - .describedAs("Old directory should not exist") - .isFalse(); - - // Assert the new destination directory exists - Assertions.assertThat(fs.exists(dst)) - .describedAs("New directory should exist") - .isTrue(); - } - /** * Tests the behavior of the atomic rename key for the root folder * in Azure Blob File System. The test verifies that the atomic rename key @@ -2058,413 +1542,81 @@ public void testGetAtomicRenameKeyForNonRootFolder() throws Exception { } /** - * Validates the atomic rename key for a specific path. + * Test case to verify path status when there is no pending rename JSON file. * - * @param abfsBlobClient the AbfsBlobClient instance - * @param path the path to check for atomic rename key - * @param expected the expected value (true or false) + * This test ensures that when no rename pending JSON file is present, the path status is + * successfully retrieved, the ETag is present, and no redo rename operation is triggered. + * + * @throws Exception if any error occurs during the test execution */ - private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String path, boolean expected) { - Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path)) - .describedAs("Atomic rename key check for path: " + path) - .isEqualTo(expected); - } + @Test + public void testGetPathStatusWithoutPendingJsonFile() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { + assumeBlobServiceType(); - /** - * Helper method to create a json file. - * @param path parent path - * @param renameJson rename json path - * @return file system - * @throws IOException in case of failure - */ - public AzureBlobFileSystem createJsonFile(Path path, Path renameJson) - throws IOException { - final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); - assumeBlobServiceType(); - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - Mockito.doReturn(store).when(fs).getAbfsStore(); - AbfsClient client = Mockito.spy(store.getClient()); - Mockito.doReturn(client).when(store).getClient(); + Path path = new Path("/hbase/A1/A2"); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - fs.setWorkingDirectory(new Path(ROOT_PATH)); - fs.create(new Path(path, "file.txt")); + fs.create(new Path(path, "file1.txt")); + fs.create(new Path(path, "file2.txt")); - VersionedFileStatus fileStatus - = (VersionedFileStatus) fs.getFileStatus(path); + AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); - new RenameAtomicity(path, new Path("/hbase/test4"), - renameJson, getTestTracingContext(fs, true), - fileStatus.getEtag(), client) - .preRename(); + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity( + Mockito.any(Path.class), Mockito.anyInt(), + Mockito.any(TracingContext.class)); - Assertions.assertThat(fs.exists(renameJson)) - .describedAs("Rename Pending Json file should exist.") - .isTrue(); + TracingContext tracingContext = new TracingContext( + conf.getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, + null); - return fs; - } + AbfsHttpOperation abfsHttpOperation = client.getPathStatus( + path.toUri().getPath(), true, + tracingContext, null).getResult(); - /** - * Test case to verify crash recovery with a single child folder. - * - * This test simulates a scenario where a pending rename JSON file exists for a single child folder - * under the parent directory. It ensures that when listing the files in the parent directory, - * only the child folder (with the pending rename JSON file) is returned, and no additional files are listed. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testListCrashRecoveryWithSingleChildFolder() throws Exception { - AzureBlobFileSystem fs = null; - try { - Path path = new Path("/hbase/A1/A2"); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - fs = createJsonFile(path, renameJson); + Assertions.assertThat(abfsHttpOperation.getStatusCode()) + .describedAs("Path should be found.") + .isEqualTo(HTTP_OK); - FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + Assertions.assertThat(extractEtagHeader(abfsHttpOperation)) + .describedAs("Etag should be present.") + .isNotNull(); - Assertions.assertThat(fileStatuses.length) - .describedAs("List should return 1 file") - .isEqualTo(1); - } finally { - if (fs != null) { - fs.close(); - } + Assertions.assertThat(redoRenameCall.get()) + .describedAs("There should be no redo rename call.") + .isEqualTo(0); } } /** - * Test case to verify crash recovery with multiple child folders. + * Test case to verify path status when there is a pending rename JSON directory. * - * This test simulates a scenario where a pending rename JSON file exists, and multiple files are - * created in the parent directory. It ensures that when listing the files in the parent directory, - * the correct number of files is returned, including the pending rename JSON file. + * This test simulates the scenario where a directory is created with a rename pending JSON + * file (indicated by a specific suffix). It ensures that the path is found, the ETag is present, + * and no redo rename operation is triggered. It also verifies that the rename pending directory + * exists. * * @throws Exception if any error occurs during the test execution */ @Test - public void testListCrashRecoveryWithMultipleChildFolder() throws Exception { - AzureBlobFileSystem fs = null; - try { + public void testGetPathStatusWithPendingJsonDir() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { + assumeBlobServiceType(); + Path path = new Path("/hbase/A1/A2"); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - fs = createJsonFile(path, renameJson); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - fs.create(new Path("/hbase/A1/file1.txt")); - fs.create(new Path("/hbase/A1/file2.txt")); + fs.create(new Path(path, "file1.txt")); + fs.create(new Path(path, "file2.txt")); - FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX)); - Assertions.assertThat(fileStatuses.length) - .describedAs("List should return 3 files") - .isEqualTo(3); - } finally { - if (fs != null) { - fs.close(); - } - } - } - - /** - * Test case to verify crash recovery with a pending rename JSON file. - * - * This test simulates a scenario where a pending rename JSON file exists in the parent directory, - * and it ensures that after the deletion of the target directory and creation of new files, - * the listing operation correctly returns the remaining files without considering the pending rename. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testListCrashRecoveryWithPendingJsonFile() throws Exception { - AzureBlobFileSystem fs = null; - try { - Path path = new Path("/hbase/A1/A2"); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - fs = createJsonFile(path, renameJson); - - fs.delete(path, true); - fs.create(new Path("/hbase/A1/file1.txt")); - fs.create(new Path("/hbase/A1/file2.txt")); - - FileStatus[] fileStatuses = fs.listStatus(path.getParent()); - - Assertions.assertThat(fileStatuses.length) - .describedAs("List should return 2 files") - .isEqualTo(2); - } finally { - if (fs != null) { - fs.close(); - } - } - } - - /** - * Test case to verify crash recovery when no pending rename JSON file exists. - * - * This test simulates a scenario where there is no pending rename JSON file in the directory. - * It ensures that the listing operation correctly returns all files in the parent directory, including - * those created after the rename JSON file is deleted. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws Exception { - AzureBlobFileSystem fs = null; - try { - Path path = new Path("/hbase/A1/A2"); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - fs = createJsonFile(path, renameJson); - - fs.delete(renameJson, true); - fs.create(new Path("/hbase/A1/file1.txt")); - fs.create(new Path("/hbase/A1/file2.txt")); - - FileStatus[] fileStatuses = fs.listStatus(path.getParent()); - - Assertions.assertThat(fileStatuses.length) - .describedAs("List should return 3 files") - .isEqualTo(3); - } finally { - if (fs != null) { - fs.close(); - } - } - } - - /** - * Test case to verify crash recovery when a pending rename JSON directory exists. - * - * This test simulates a scenario where a pending rename JSON directory exists, ensuring that the - * listing operation correctly returns all files in the parent directory without triggering a redo - * rename operation. It also checks that the directory with the suffix "-RenamePending.json" exists. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testListCrashRecoveryWithPendingJsonDir() throws Exception { - try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { - assumeBlobServiceType(); - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - - Path path = new Path("/hbase/A1/A2"); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - fs.mkdirs(renameJson); - - fs.create(new Path(path.getParent(), "file1.txt")); - fs.create(new Path(path, "file2.txt")); - - AtomicInteger redoRenameCall = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - redoRenameCall.incrementAndGet(); - return answer.callRealMethod(); - }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), - Mockito.anyInt(), Mockito.any(TracingContext.class)); - - FileStatus[] fileStatuses = fs.listStatus(path.getParent()); - - Assertions.assertThat(fileStatuses.length) - .describedAs("List should return 3 files") - .isEqualTo(3); - - Assertions.assertThat(redoRenameCall.get()) - .describedAs("No redo rename call should be made") - .isEqualTo(0); - - Assertions.assertThat( - Arrays.stream(fileStatuses) - .anyMatch(status -> renameJson.toUri().getPath().equals(status.getPath().toUri().getPath()))) - .describedAs("Directory with suffix -RenamePending.json should exist.") - .isTrue(); - } - } - - /** - * Test case to verify crash recovery during listing with multiple pending rename JSON files. - * - * This test simulates a scenario where multiple pending rename JSON files exist, ensuring that - * crash recovery properly handles the situation. It verifies that two redo rename calls are made - * and that the list operation returns the correct number of paths. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testListCrashRecoveryWithMultipleJsonFile() throws Exception { - AzureBlobFileSystem fs = null; - try { - Path path = new Path("/hbase/A1/A2"); - - // 1st Json file - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - fs = createJsonFile(path, renameJson); - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - - // 2nd Json file - Path path2 = new Path("/hbase/A1/A3"); - fs.create(new Path(path2, "file3.txt")); - - Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX); - VersionedFileStatus fileStatus - = (VersionedFileStatus) fs.getFileStatus(path2); - - new RenameAtomicity(path2, new Path("/hbase/test4"), - renameJson2, getTestTracingContext(fs, true), - fileStatus.getEtag(), client).preRename(); - - fs.create(new Path(path, "file2.txt")); - - AtomicInteger redoRenameCall = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - redoRenameCall.incrementAndGet(); - return answer.callRealMethod(); - }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), - Mockito.anyInt(), Mockito.any(TracingContext.class)); - - FileStatus[] fileStatuses = fs.listStatus(path.getParent()); - - Assertions.assertThat(fileStatuses.length) - .describedAs("List should return 2 paths") - .isEqualTo(2); - - Assertions.assertThat(redoRenameCall.get()) - .describedAs("2 redo rename calls should be made") - .isEqualTo(2); - } finally { - if (fs != null) { - fs.close(); - } - } - } - - /** - * Test case to verify path status when a pending rename JSON file exists. - * - * This test simulates a scenario where a rename operation was pending, and ensures that - * the path status retrieval triggers a redo rename operation. The test also checks that - * the correct error code (`PATH_NOT_FOUND`) is returned. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testGetPathStatusWithPendingJsonFile() throws Exception { - AzureBlobFileSystem fs = null; - try { - Path path = new Path("/hbase/A1/A2"); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - fs = createJsonFile(path, renameJson); - - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - - fs.create(new Path("/hbase/A1/file1.txt")); - fs.create(new Path("/hbase/A1/file2.txt")); - - AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); - - AtomicInteger redoRenameCall = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - redoRenameCall.incrementAndGet(); - return answer.callRealMethod(); - }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), - Mockito.anyInt(), Mockito.any(TracingContext.class)); - - TracingContext tracingContext = new TracingContext( - conf.getClientCorrelationId(), fs.getFileSystemId(), - FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); - - AzureServiceErrorCode azureServiceErrorCode = intercept( - AbfsRestOperationException.class, () -> client.getPathStatus( - path.toUri().getPath(), true, - tracingContext, null)).getErrorCode(); - - Assertions.assertThat(azureServiceErrorCode.getErrorCode()) - .describedAs("Path had to be recovered from atomic rename operation.") - .isEqualTo(PATH_NOT_FOUND.getErrorCode()); - - Assertions.assertThat(redoRenameCall.get()) - .describedAs("There should be one redo rename call") - .isEqualTo(1); - } finally { - if (fs != null) { - fs.close(); - } - } - } - - /** - * Test case to verify path status when there is no pending rename JSON file. - * - * This test ensures that when no rename pending JSON file is present, the path status is - * successfully retrieved, the ETag is present, and no redo rename operation is triggered. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testGetPathStatusWithoutPendingJsonFile() throws Exception { - try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { - assumeBlobServiceType(); - - Path path = new Path("/hbase/A1/A2"); - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - - fs.create(new Path(path, "file1.txt")); - fs.create(new Path(path, "file2.txt")); - - AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); - - AtomicInteger redoRenameCall = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - redoRenameCall.incrementAndGet(); - return answer.callRealMethod(); - }).when(client).getRedoRenameAtomicity( - Mockito.any(Path.class), Mockito.anyInt(), - Mockito.any(TracingContext.class)); - - TracingContext tracingContext = new TracingContext( - conf.getClientCorrelationId(), fs.getFileSystemId(), - FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, - null); - - AbfsHttpOperation abfsHttpOperation = client.getPathStatus( - path.toUri().getPath(), true, - tracingContext, null).getResult(); - - Assertions.assertThat(abfsHttpOperation.getStatusCode()) - .describedAs("Path should be found.") - .isEqualTo(HTTP_OK); - - Assertions.assertThat(extractEtagHeader(abfsHttpOperation)) - .describedAs("Etag should be present.") - .isNotNull(); - - Assertions.assertThat(redoRenameCall.get()) - .describedAs("There should be no redo rename call.") - .isEqualTo(0); - } - } - - /** - * Test case to verify path status when there is a pending rename JSON directory. - * - * This test simulates the scenario where a directory is created with a rename pending JSON - * file (indicated by a specific suffix). It ensures that the path is found, the ETag is present, - * and no redo rename operation is triggered. It also verifies that the rename pending directory - * exists. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testGetPathStatusWithPendingJsonDir() throws Exception { - try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { - assumeBlobServiceType(); - - Path path = new Path("/hbase/A1/A2"); - AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); - - fs.create(new Path(path, "file1.txt")); - fs.create(new Path(path, "file2.txt")); - - fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX)); - - AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); + AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); AtomicInteger redoRenameCall = new AtomicInteger(0); Mockito.doAnswer(answer -> { @@ -2499,71 +1651,6 @@ public void testGetPathStatusWithPendingJsonDir() throws Exception { } } - /** - * Test case to verify the behavior when the ETag of a file changes during a rename operation. - * - * This test simulates a scenario where the ETag of a file changes after the creation of a - * rename pending JSON file. The steps include: - * - Creating a rename pending JSON file with an old ETag. - * - Deleting the original directory for an ETag change. - * - Creating new files in the directory. - * - Verifying that the copy blob call is not triggered. - * - Verifying that the rename atomicity operation is called once. - * - * The test ensures that the system correctly handles the ETag change during the rename process. - * - * @throws Exception if any error occurs during the test execution - */ - @Test - public void testETagChangedDuringRename() throws Exception { - AzureBlobFileSystem fs = null; - try { - assumeBlobServiceType(); - Path path = new Path("/hbase/A1/A2"); - Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); - // Create rename pending json file with old etag - fs = createJsonFile(path, renameJson); - AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs); - fs.getAbfsStore().setClient(abfsBlobClient); - - // Delete the directory to change etag - fs.delete(path, true); - - fs.create(new Path(path, "file1.txt")); - fs.create(new Path(path, "file2.txt")); - AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0); - Mockito.doAnswer(copyBlob -> { - numberOfCopyBlobCalls.incrementAndGet(); - return copyBlob.callRealMethod(); - }) - .when(abfsBlobClient) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - - AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0); - Mockito.doAnswer(redoRenameAtomicity -> { - numberOfRedoRenameAtomicityCalls.incrementAndGet(); - return redoRenameAtomicity.callRealMethod(); - }) - .when(abfsBlobClient) - .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(), - Mockito.any(TracingContext.class)); - // Call list status to trigger rename redo - fs.listStatus(path.getParent()); - Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get()) - .describedAs("There should be one call to getRedoRenameAtomicity") - .isEqualTo(1); - Assertions.assertThat(numberOfCopyBlobCalls.get()) - .describedAs("There should be no copy blob call") - .isEqualTo(0); - } finally { - if (fs != null) { - fs.close(); - } - } - } - /** * Test to verify the idempotency of the `rename` operation in Azure Blob File System when retrying * after a failure. The test simulates a "path not found" error (HTTP 404) on the first attempt, @@ -2588,7 +1675,7 @@ public void testRenamePathRetryIdempotency() throws Exception { Path destFilePath = new Path(sourceDir, "file2"); final List<AbfsHttpHeader> headers = new ArrayList<>(); - mockRetriedRequest(abfsClient, headers); + mockRetriedRequest(abfsClient, headers, 0); AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class); AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class); @@ -2612,7 +1699,9 @@ public void testRenamePathRetryIdempotency() throws Exception { Mockito.nullable(String.class), Mockito.nullable(Boolean.class), Mockito.nullable(TracingContext.class), Mockito.nullable(ContextEncryptionAdapter.class)); - fs.rename(sourceFilePath, destFilePath); + Assertions.assertThat(fs.rename(sourceFilePath, destFilePath)) + .describedAs("Rename should succeed.") + .isTrue(); } } @@ -2672,7 +1761,7 @@ public void testFailureInGetPathStatusDuringRenameRecovery() throws Exception { fs.getAbfsStore().setClient(abfsDfsClient); final String[] clientTransactionId = new String[1]; mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId); - mockRetriedRequest(abfsDfsClient, new ArrayList<>()); + mockRetriedRequest(abfsDfsClient, new ArrayList<>(), 1); int[] flag = new int[1]; Mockito.doAnswer(getPathStatus -> { if (flag[0] == 1) { @@ -2703,41 +1792,569 @@ public void testFailureInGetPathStatusDuringRenameRecovery() throws Exception { } /** - * Mocks the retry behavior for an AbfsDfsClient request. The method intercepts - * the Abfs operation and simulates an HTTP conflict (HTTP 404) error on the - * first invocation. It creates a mock HTTP operation with a PUT method and - * specific status codes and error messages. + * Tests renaming a directory when the destination parent directory does not exist. + * The test verifies that the rename operation fails as expected. * - * @param abfsDfsClient The AbfsDfsClient to mock operations for. - * @param headers The list of HTTP headers to which request headers will be added. + * @throws Exception if an error occurs during the test execution + */ + @Test + public void testRenameWithDestParentNotExist() throws Exception { + try (AzureBlobFileSystem fs = this.getFileSystem()) { + Path src = new Path("/A1/A2"); + Path dst = new Path("/A3/A4"); + fs.create(new Path(src, "file.txt")); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should fail as destination parent not exist.") + .isFalse(); + } + } + + /** + * Tests renaming a directory when the destination parent directory is the root. + * The test verifies that the rename operation succeeds as expected. * - * @throws Exception If an error occurs during mock creation or operation execution. + * @throws Exception if an error occurs during the test execution + */ + @Test + public void testRenameWithDestParentAsRoot() throws Exception { + try (AzureBlobFileSystem fs = this.getFileSystem()) { + Path src = new Path("/A1/A2"); + Path dst = new Path("/A3"); + fs.create(new Path(src, "file.txt")); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should succeed.") + .isTrue(); + Assertions.assertThat(fs.exists(new Path(dst, "file.txt"))) + .describedAs("File should exist in destination directory.") + .isTrue(); + } + } + + /** + * Tests renaming a file when the destination is root. + * The test verifies that the rename operation succeeds as expected. + * + * @throws Exception if an error occurs during the test execution */ - private void mockRetriedRequest(AbfsDfsClient abfsDfsClient, - final List<AbfsHttpHeader> headers) throws Exception { - TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient, - new MockIntercept<AbfsRestOperation>() { - private int count = 0; - - @Override - public void answer(final AbfsRestOperation mockedObj, - final InvocationOnMock answer) - throws AbfsRestOperationException { - if (count == 0) { - count = 1; - AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class); - Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod(); - Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage(); - Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op) - .getStorageErrorCode(); - Mockito.doReturn(true).when(mockedObj).hasResult(); - Mockito.doReturn(op).when(mockedObj).getResult(); - Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode(); - headers.addAll(mockedObj.getRequestHeaders()); - throw new AbfsRestOperationException(HTTP_NOT_FOUND, - SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op); - } - } - }, 1); - } + @Test + public void testFileRenameWithDestAsRoot() throws Exception { + try (AzureBlobFileSystem fs = this.getFileSystem()) { + Path src = new Path("/A1/A2/file.txt"); + Path dst = new Path("/"); + fs.create(src); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should succeed.") + .isTrue(); + Assertions.assertThat(fs.exists(new Path(dst, "file.txt"))) + .describedAs("File should exist in root.") + .isTrue(); + } + } + + /** + * Tests renaming a directory when the destination is root. + * The test verifies that the rename operation succeeds as expected. + * + * @throws Exception if an error occurs during the test execution + */ + @Test + public void testDirRenameWithDestAsRoot() throws Exception { + try (AzureBlobFileSystem fs = this.getFileSystem()) { + Path src = new Path("/A1/A2"); + Path dst = new Path("/"); + fs.create(new Path(src, "file.txt")); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should succeed.") + .isTrue(); + Assertions.assertThat(fs.exists(new Path(dst, src.getName()))) + .describedAs("A2 directory should exist in root.") + .isTrue(); + } + } + + /** + * Tests the rename operation with multiple directories in the source path. + * This test verifies that the rename operation correctly handles + * multiple directories and files, ensuring that the source directory + * is renamed to the destination path. + * + * @throws Exception if an error occurs during the test execution + */ + @Test + public void testRenameWithMultipleDirsInSource() throws Exception { + try (AzureBlobFileSystem fs = this.getFileSystem()) { + assumeBlobServiceType(); + fs.mkdirs(new Path("/testDir/dir1")); + for (int i = 0; i < 10; i++) { + fs.create(new Path("/testDir/dir1/file" + i)); + } + fs.mkdirs(new Path("/testDir/dir2")); + fs.create(new Path("/testDir/dir2/file2")); + createAzCopyFolder(new Path("/testDir/dir3")); + Assertions.assertThat(fs.rename(new Path("/testDir"), + new Path("/testDir2"))) + .describedAs("Rename should succeed.") + .isTrue(); + Assertions.assertThat(fs.exists(new Path("/testDir"))) + .describedAs("Old directory should not exist.") + .isFalse(); + Assertions.assertThat(fs.exists(new Path("/testDir2"))) + .describedAs("New directory should exist.") + .isTrue(); + } + } + + /** + * Tests the rename operation with multiple implicit directories in the source path. + * This test verifies that the rename operation correctly handles + * multiple directories and files, ensuring that the source directory + * is renamed to the destination path. + * + * @throws Exception if an error occurs during the test execution + */ + @Test + public void testRenameWithMultipleImplicitDirsInSource() throws Exception { + try (AzureBlobFileSystem fs = this.getFileSystem()) { + assumeBlobServiceType(); + createAzCopyFolder(new Path("/testDir/dir1")); + for (int i = 0; i < 10; i++) { + createAzCopyFile(new Path("/testDir/dir1/file" + i)); + } + createAzCopyFolder(new Path("/testDir/dir2")); + createAzCopyFile(new Path("/testDir/dir2/file2")); + createAzCopyFolder(new Path("/testDir/dir3")); + Assertions.assertThat(fs.rename(new Path("/testDir"), + new Path("/testDir2"))) + .describedAs("Rename should succeed.") + .isTrue(); + Assertions.assertThat(fs.exists(new Path("/testDir"))) + .describedAs("Old directory should not exist.") + .isFalse(); + Assertions.assertThat(fs.exists(new Path("/testDir2"))) + .describedAs("New directory should exist.") + .isTrue(); + } + } + + /** + * Tests renaming a directory with an explicit directory in the source path. + * This test verifies that the rename operation correctly handles + * the explicit directory and files, ensuring that the source directory + * is renamed to the destination path. + * + * @throws Exception if an error occurs during the test execution + */ + @Test + public void testRenameWithExplicitDirInSource() throws Exception { + try (AzureBlobFileSystem fs = this.getFileSystem()) { + assumeBlobServiceType(); + fs.create(new Path("/testDir/dir3/file2")); + fs.create(new Path("/testDir/dir3/file1")); + Assertions.assertThat(fs.rename(new Path("/testDir"), + new Path("/testDir2"))) + .describedAs("Rename should succeed.") + .isTrue(); + Assertions.assertThat(fs.exists(new Path("/testDir"))) + .describedAs("Old directory should not exist.") + .isFalse(); + Assertions.assertThat(fs.exists(new Path("/testDir2"))) + .describedAs("New directory should exist.") + .isTrue(); + } + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Simulates a rename failure, performs a recovery action, and verifies that the "RenamePendingJson" + * file is deleted. It checks that the rename operation is successfully completed after recovery. + * + * @param fs the AzureBlobFileSystem instance + * @param client the AbfsBlobClient instance + * @param srcPath the source path for the rename operation + * @param recoveryCallable the recovery action to perform + * @throws Exception if an error occurs during recovery or verification + */ + private void crashRenameAndRecover(final AzureBlobFileSystem fs, + AbfsBlobClient client, + final String srcPath, + final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryCallable) + throws Exception { + crashRename(fs, client, srcPath); + AzureBlobFileSystem fs2 = Mockito.spy(getFileSystem()); + fs2.setWorkingDirectory(new Path(ROOT_PATH)); + client = (AbfsBlobClient) addSpyHooksOnClient(fs2); + int[] renameJsonDeleteCounter = new int[1]; + Mockito.doAnswer(answer -> { + if ((ROOT_PATH + srcPath + SUFFIX) + .equalsIgnoreCase(((Path) answer.getArgument(0)).toUri().getPath())) { + renameJsonDeleteCounter[0] = 1; + } + return answer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + recoveryCallable.apply(fs2); + Assertions.assertThat(renameJsonDeleteCounter[0]) + .describedAs("RenamePendingJson should be deleted") + .isEqualTo(1); + //List would complete the rename orchestration. + assertFalse(fs2.exists(new Path("hbase/test1/test2"))); + assertFalse(fs2.exists(new Path("hbase/test1/test2/test3"))); + assertTrue(fs2.exists(new Path("hbase/test4/test2/test3"))); + assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file"))); + assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file"))); + assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file1"))); + assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file1"))); + } + + /** + * Simulates a rename failure by triggering an `AbfsRestOperationException` during the rename process. + * It intercepts the exception and ensures that all leases acquired during the atomic rename are released. + * + * @param fs the AzureBlobFileSystem instance used for the rename operation + * @param client the AbfsBlobClient instance used for mocking the rename failure + * @param srcPath the source path for the rename operation + * @throws Exception if an error occurs during the simulated failure or lease release + */ + private void crashRename(final AzureBlobFileSystem fs, + final AbfsBlobClient client, + final String srcPath) throws Exception { + BlobRenameHandler[] blobRenameHandlers = new BlobRenameHandler[1]; + AbfsClientTestUtil.mockGetRenameBlobHandler(client, + blobRenameHandler -> { + blobRenameHandlers[0] = blobRenameHandler; + return null; + }); + //Fail rename orchestration on path hbase/test1/test2/test3/file1 + Mockito.doThrow(new AbfsRestOperationException(HTTP_FORBIDDEN, "", "", + new Exception())) + .when(client) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + LambdaTestUtils.intercept(AccessDeniedException.class, () -> { + fs.rename(new Path(srcPath), + new Path("hbase/test4")); + }); + //Release all the leases taken by atomic rename orchestration + List<AbfsLease> leases = new ArrayList<>(blobRenameHandlers[0].getLeases()); + for (AbfsLease lease : leases) { + lease.free(); + } + } + + /** + * A helper method to set up the test environment and execute the common logic for handling + * failed rename operations and recovery in HBase. This method performs the necessary setup + * (creating directories and files) and then triggers the `crashRenameAndRecover` method + * with a provided recovery action. + * This method is used by different tests that require different recovery actions, such as + * performing `listStatus` or checking the existence of a path after a failed rename. + * + * @param fs the AzureBlobFileSystem instance to be used in the test + * @param client the AbfsBlobClient instance to be used in the test + * @param srcPath the source path for the rename operation + * @param failedCopyPath the path that simulates a failed copy during rename + * @param recoveryAction the specific recovery action to be performed after the rename failure + * (e.g., listing directory status or checking path existence) + * @throws Exception if any error occurs during setup or execution of the recovery action + */ + private void setupAndTestHBaseFailedRenameRecovery( + final AzureBlobFileSystem fs, + final AbfsBlobClient client, + final String srcPath, + final String failedCopyPath, + final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryAction) + throws Exception { + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path(srcPath)); + fs.mkdirs(new Path(srcPath, "test3")); + fs.create(new Path(srcPath + "/test3/file")); + fs.create(new Path(failedCopyPath)); + fs.mkdirs(new Path("hbase/test4/")); + fs.create(new Path("hbase/test4/file1")); + crashRenameAndRecover(fs, client, srcPath, recoveryAction); + } + + /** + * Tests renaming a directory in AzureBlobFileSystem when the creation of the "RenamePendingJson" + * file fails on the first attempt. It ensures the renaming operation is retried. + * The test verifies that the creation of the "RenamePendingJson" file is attempted twice: + * once on failure and once on retry. + * + * @param fs the AzureBlobFileSystem instance for the test + * @throws Exception if an error occurs during the test + */ + private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs) + throws Exception { + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + Path src = new Path("hbase/test1/test2"); + Path dest = new Path("hbase/test4"); + fs.mkdirs(src); + fs.mkdirs(new Path(src, "test3")); + final int[] renamePendingJsonWriteCounter = new int[1]; + /* + * Fail the creation of RenamePendingJson file on the first attempt. + */ + Answer renamePendingJsonCreateAns = createAnswer -> { + Path path = createAnswer.getArgument(0); + Mockito.doAnswer(clientFlushAns -> { + if (renamePendingJsonWriteCounter[0]++ == 0) { + fs.delete(path, true); + } + return clientFlushAns.callRealMethod(); + }) + .when(client) + .flush(Mockito.any(byte[].class), Mockito.anyString(), + Mockito.anyBoolean(), Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.anyString(), + Mockito.nullable(ContextEncryptionAdapter.class), + Mockito.any(TracingContext.class)); + return createAnswer.callRealMethod(); + }; + RenameAtomicityTestUtils.addCreatePathMock(client, + renamePendingJsonCreateAns); + fs.rename(src, dest); + Assertions.assertThat(renamePendingJsonWriteCounter[0]) + .describedAs("Creation of RenamePendingJson should be attempted twice") + .isEqualTo(2); + } + + /** + * Tests the behavior of the redo operation when an invalid "RenamePendingJson" file exists. + * It verifies that the file is deleted and that no copy operation is performed. + * The test simulates a scenario where the "RenamePendingJson" file is partially written and + * ensures that the `redo` method correctly deletes the file and does not trigger a copy operation. + * + * @param fs the AzureBlobFileSystem instance for the test + * @throws Exception if an error occurs during the test + */ + private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs) + throws Exception { + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + Path path = new Path("/hbase/test1/test2"); + fs.mkdirs(new Path(path, "test3")); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + OutputStream os = fs.create(renameJson); + os.write("{".getBytes(StandardCharsets.UTF_8)); + os.close(); + int[] renameJsonDeleteCounter = new int[1]; + Mockito.doAnswer(deleteAnswer -> { + Path ansPath = deleteAnswer.getArgument(0); + if (renameJson.toUri() + .getPath() + .equalsIgnoreCase(ansPath.toUri().getPath())) { + renameJsonDeleteCounter[0]++; + } + return deleteAnswer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + new RenameAtomicity(renameJson, 1, + getTestTracingContext(fs, true), null, client).redo(); + Assertions.assertThat(renameJsonDeleteCounter[0]) + .describedAs("RenamePendingJson should be deleted") + .isEqualTo(1); + Mockito.verify(client, Mockito.times(0)).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + } + + /** + * Mocks the progress status for a copy blob operation. + * This method simulates a copy operation that is pending and not yet completed. + * It intercepts the `copyBlob` method and modifies its response to return a "COPY_STATUS_PENDING" + * status for the copy operation. + * + * @param spiedClient The {@link AbfsBlobClient} instance that is being spied on. + * @throws AzureBlobFileSystemException if the mock setup fails. + */ + private void addMockForProgressStatusOnCopyOperation(final AbfsBlobClient spiedClient) + throws AzureBlobFileSystemException { + Mockito.doAnswer(answer -> { + AbfsRestOperation op = Mockito.spy( + (AbfsRestOperation) answer.callRealMethod()); + AbfsHttpOperation httpOp = Mockito.spy(op.getResult()); + Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader( + HttpHeaderConfigurations.X_MS_COPY_STATUS); + Mockito.doReturn(httpOp).when(op).getResult(); + return op; + }) + .when(spiedClient) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + } + + /** + * Mocks the final status of a blob copy operation. + * This method ensures that when checking the status of a copy operation in progress, + * it returns the specified final status (e.g., success, failure, aborted). + * + * @param spiedClient The mocked Azure Blob client to apply the mock behavior. + * @param requiredCopyFinalStatus The final status of the copy operation to be returned + * (e.g., COPY_STATUS_FAILED, COPY_STATUS_ABORTED). + */ + private void addMockForCopyOperationFinalStatus(final AbfsBlobClient spiedClient, + final String requiredCopyFinalStatus) { + AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient, + blobRenameHandler -> { + Mockito.doAnswer(onHandleCopyInProgress -> { + Path handlePath = onHandleCopyInProgress.getArgument(0); + TracingContext tracingContext = onHandleCopyInProgress.getArgument( + 1); + Mockito.doAnswer(onStatusCheck -> { + AbfsRestOperation op = Mockito.spy( + (AbfsRestOperation) onStatusCheck.callRealMethod()); + AbfsHttpOperation httpOp = Mockito.spy(op.getResult()); + Mockito.doReturn(requiredCopyFinalStatus) + .when(httpOp) + .getResponseHeader( + HttpHeaderConfigurations.X_MS_COPY_STATUS); + Mockito.doReturn(httpOp).when(op).getResult(); + return op; + }) + .when(spiedClient) + .getPathStatus(handlePath.toUri().getPath(), + tracingContext, null, false); + return onHandleCopyInProgress.callRealMethod(); + }) + .when(blobRenameHandler) + .handleCopyInProgress(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.any(String.class)); + return null; + }); + } + + /** + * Helper method to configure the AzureBlobFileSystem and rename directories. + * + * @param currentFs The current AzureBlobFileSystem to use for renaming. + * @param producerQueueSize Maximum size of the producer queue. + * @param consumerMaxLag Maximum lag allowed for the consumer. + * @param maxThread Maximum threads for the rename operation. + * @param src The source path of the directory to rename. + * @param dst The destination path of the renamed directory. + * @throws IOException If an I/O error occurs during the operation. + */ + private void renameDir(AzureBlobFileSystem currentFs, String producerQueueSize, + String consumerMaxLag, String maxThread, Path src, Path dst) + throws Exception { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize); + config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread); + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config)) { + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should succeed.") + .isTrue(); + } + } + + /** + * Performs the rename operation and validates the existence of the directories and files. + * + * @param fs the AzureBlobFileSystem instance + * @param src the source path to be renamed + * @param dst the destination path for the rename + * @param fileName the name of the file to be renamed + */ + private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path dst, String fileName) + throws IOException { + // Assert the source directory exists + Assertions.assertThat(fs.exists(src)) + .describedAs("Old directory should exist before rename") + .isTrue(); + + // Perform rename + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should succeed.") + .isTrue(); + + // Assert the destination directory and file exist after rename + Assertions.assertThat(fs.exists(new Path(dst, fileName))) + .describedAs("Rename should be successful") + .isTrue(); + + // Assert the source directory no longer exists + Assertions.assertThat(fs.exists(src)) + .describedAs("Old directory should not exist") + .isFalse(); + + // Assert the new destination directory exists + Assertions.assertThat(fs.exists(dst)) + .describedAs("New directory should exist") + .isTrue(); + } + + /** + * Validates the atomic rename key for a specific path. + * + * @param abfsBlobClient the AbfsBlobClient instance + * @param path the path to check for atomic rename key + * @param expected the expected value (true or false) + */ + private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String path, boolean expected) { + Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path)) + .describedAs("Atomic rename key check for path: " + path) + .isEqualTo(expected); + } + + /** + * Mocks the retry behavior for an AbfsDfsClient request. The method intercepts + * the Abfs operation and simulates an HTTP conflict (HTTP 404) error on the + * first invocation. It creates a mock HTTP operation with a PUT method and + * specific status codes and error messages. + * + * @param abfsDfsClient The AbfsDfsClient to mock operations for. + * @param headers The list of HTTP headers to which request headers will be added. + * + * @throws Exception If an error occurs during mock creation or operation execution. + */ + private void mockRetriedRequest(AbfsDfsClient abfsDfsClient, + final List<AbfsHttpHeader> headers, int failedCall) throws Exception { + TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient, + new MockIntercept<AbfsRestOperation>() { + private int count = 0; + + @Override + public void answer(final AbfsRestOperation mockedObj, + final InvocationOnMock answer) + throws AbfsRestOperationException { + if (count == 0) { + count = 1; + AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class); + Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod(); + Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage(); + Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op) + .getStorageErrorCode(); + Mockito.doReturn(true).when(mockedObj).hasResult(); + Mockito.doReturn(op).when(mockedObj).getResult(); + Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode(); + headers.addAll(mockedObj.getRequestHeaders()); + throw new AbfsRestOperationException(HTTP_NOT_FOUND, + SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op); + } + } + }, failedCall); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java new file mode 100644 index 00000000000..e4d9f826000 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java @@ -0,0 +1,770 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameCopyFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Tests renaming a directory with a failure during the delete operation. + * Simulates an error on the 6th delete operation and verifies the behavior. + */ + @Test + public void testRenameDeleteFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of delete operations + AtomicInteger deleteCall = new AtomicInteger(0); + Mockito.doAnswer(deleteRequest -> { + if (deleteCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_PATH_NOT_FOUND.getStatusCode(), + BLOB_PATH_NOT_FOUND.getErrorCode(), + BLOB_PATH_NOT_FOUND.getErrorMessage(), + new Exception()); + } + deleteCall.incrementAndGet(); + return deleteRequest.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), + Mockito.anyString(), Mockito.any(TracingContext.class)); + + renameOperationWithRecovery(fs, src, dst, deleteCall); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWhenDestAlreadyExist() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + // Create the destination directory + fs.mkdirs(dst); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + // Assertions to validate source and dest status before rename + validateRename(fs, src, dst, true, true, false); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWithMarkerPresentInDest() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Test to check behaviour when rename is called on a atomic rename directory + * for which rename pending json file is already present. + * @throws Exception in case of failure + */ + @Test + public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Test case to verify crash recovery with a single child folder. + * + * This test simulates a scenario where a pending rename JSON file exists for a single child folder + * under the parent directory. It ensures that when listing the files in the parent directory, + * only the child folder is returned, and no additional files are listed. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithSingleChildFolder() throws Exception { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + AzureBlobFileSystem fs = createJsonFile(path, renameJson); + + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 0 file") + .isEqualTo(0); + assertPendingJsonFile(fs, renameJson, fileStatuses, path, false); + } + + /** + * Test case to verify crash recovery with multiple child folders. + * + * This test simulates a scenario where a pending rename JSON file exists, and multiple files are + * created in the parent directory. It ensures that when listing the files in the parent directory, + * the correct number of files is returned. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithMultipleChildFolder() throws Exception { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + AzureBlobFileSystem fs = createJsonFile(path, renameJson); + + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 2 files") + .isEqualTo(2); + assertPendingJsonFile(fs, renameJson, fileStatuses, path, false); + } + + /** + * Test case to verify crash recovery with a pending rename JSON file. + * + * This test simulates a scenario where a pending rename JSON file exists in the parent directory, + * and it ensures that after the deletion of the target directory and creation of new files, + * the listing operation correctly returns the remaining files. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithPendingJsonFile() throws Exception { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + AzureBlobFileSystem fs = createJsonFile(path, renameJson); + + fs.delete(path, true); + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 2 files") + .isEqualTo(2); + assertPendingJsonFile(fs, renameJson, fileStatuses, path, false); + } + + /** + * Test case to verify crash recovery when no pending rename JSON file exists. + * + * This test simulates a scenario where there is no pending rename JSON file in the directory. + * It ensures that the listing operation correctly returns all files in the parent directory. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws Exception { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + AzureBlobFileSystem fs = createJsonFile(path, renameJson); + + fs.delete(renameJson, true); + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 3 files") + .isEqualTo(3); + // Pending json file not present, no recovery take place, so source directory should exist. + assertPendingJsonFile(fs, renameJson, fileStatuses, path, true); + } + + /** + * Test case to verify crash recovery when a pending rename JSON directory exists. + * + * This test simulates a scenario where a pending rename JSON directory exists, ensuring that the + * listing operation correctly returns all files in the parent directory without triggering a redo + * rename operation. It also checks that the directory with the suffix "-RenamePending.json" exists. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithPendingJsonDir() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs.mkdirs(renameJson); + + fs.create(new Path(path.getParent(), "file1.txt")); + fs.create(new Path(path, "file2.txt")); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), + Mockito.anyInt(), Mockito.any(TracingContext.class)); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 3 files") + .isEqualTo(3); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("No redo rename call should be made") + .isEqualTo(0); + + Assertions.assertThat( + Arrays.stream(fileStatuses) + .anyMatch(status -> renameJson.toUri().getPath().equals(status.getPath().toUri().getPath()))) + .describedAs("Directory with suffix -RenamePending.json should exist.") + .isTrue(); + } + } + + /** + * Test case to verify crash recovery during listing with multiple pending rename JSON files. + * + * This test simulates a scenario where multiple pending rename JSON files exist, ensuring that + * crash recovery properly handles the situation. It verifies that two redo rename calls are made + * and that the list operation returns the correct number of paths. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithMultipleJsonFile() throws Exception { + Path path = new Path("/hbase/A1/A2"); + + // 1st Json file + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + AzureBlobFileSystem fs = createJsonFile(path, renameJson); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + // 2nd Json file + Path path2 = new Path("/hbase/A1/A3"); + fs.create(new Path(path2, "file3.txt")); + + Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX); + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path2); + + new RenameAtomicity(path2, new Path("/hbase/test4"), + renameJson2, getTestTracingContext(fs, true), + fileStatus.getEtag(), client).preRename(); + + fs.create(new Path(path, "file2.txt")); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), + Mockito.anyInt(), Mockito.any(TracingContext.class)); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 0 paths") + .isEqualTo(0); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("2 redo rename calls should be made") + .isEqualTo(2); + assertPathStatus(fs, path, false, + "Source directory should not exist."); + assertPathStatus(fs, new Path("/hbase/test4/file.txt"), true, + "File in destination directory should exist."); + assertPathStatus(fs, path2, false, + "Source directory should not exist"); + assertPathStatus(fs, new Path("/hbase/test4/file2.txt"), true, + "File in destination directory should exist."); + assertPathStatus(fs, renameJson, false, + "Rename Pending Json file should not exist."); + assertPathStatus(fs, renameJson2, false, + "Rename Pending Json file should not exist."); + } + + /** + * Test case to verify path status when a pending rename JSON file exists. + * + * This test simulates a scenario where a rename operation was pending, and ensures that + * the path status retrieval triggers a redo rename operation. The test also checks that + * the correct error code (`PATH_NOT_FOUND`) is returned. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testGetPathStatusWithPendingJsonFile() throws Exception { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + AzureBlobFileSystem fs = createJsonFile(path, renameJson); + + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), + Mockito.anyInt(), Mockito.any(TracingContext.class)); + + TracingContext tracingContext = new TracingContext( + conf.getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + + AzureServiceErrorCode azureServiceErrorCode = intercept( + AbfsRestOperationException.class, () -> client.getPathStatus( + path.toUri().getPath(), true, + tracingContext, null)).getErrorCode(); + + Assertions.assertThat(azureServiceErrorCode.getErrorCode()) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND.getErrorCode()); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("There should be one redo rename call") + .isEqualTo(1); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should not exist.") + .isFalse(); + } + + /** + * Test case to verify the behavior when the ETag of a file changes during a rename operation. + * + * This test simulates a scenario where the ETag of a file changes after the creation of a + * rename pending JSON file. The steps include: + * - Creating a rename pending JSON file with an old ETag. + * - Deleting the original directory for an ETag change. + * - Creating new files in the directory. + * - Verifying that the copy blob call is not triggered. + * - Verifying that the rename atomicity operation is called once. + * + * The test ensures that the system correctly handles the ETag change during the rename process. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testETagChangedDuringRename() throws Exception { + assumeBlobServiceType(); + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + // Create rename pending json file with old etag + AzureBlobFileSystem fs = createJsonFile(path, renameJson); + AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(abfsBlobClient); + + // Delete the directory to change etag + fs.delete(path, true); + + fs.create(new Path(path, "file1.txt")); + fs.create(new Path(path, "file2.txt")); + AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0); + Mockito.doAnswer(copyBlob -> { + numberOfCopyBlobCalls.incrementAndGet(); + return copyBlob.callRealMethod(); + }) + .when(abfsBlobClient) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0); + Mockito.doAnswer(redoRenameAtomicity -> { + numberOfRedoRenameAtomicityCalls.incrementAndGet(); + return redoRenameAtomicity.callRealMethod(); + }) + .when(abfsBlobClient) + .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + // Call list status to trigger rename redo + fs.listStatus(path.getParent()); + Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get()) + .describedAs("There should be one call to getRedoRenameAtomicity") + .isEqualTo(1); + Assertions.assertThat(numberOfCopyBlobCalls.get()) + .describedAs("There should be no copy blob call") + .isEqualTo(0); + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should not exist.") + .isFalse(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to rename. + * @param dst The destination path for the rename operation. + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AzureBlobFileSystem fs, Path src, Path dst, + AbfsBlobClient client, AtomicInteger copyCall) + throws Exception { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + renameOperationWithRecovery(fs, src, dst, copyCall); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( + path.toUri().getPath(), tracingContext, + null, true) + .getResult().getStatusCode(); + Assertions.assertThat(actualStatus) + .describedAs("%s should exists", description) + .isEqualTo(HTTP_OK); + } else { + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("%s should not exists", description) + .isEqualTo(BLOB_PATH_NOT_FOUND); + } + } + + /** + * Helper method to create a json file. + * @param path parent path + * @param renameJson rename json path + * @return file system + * @throws IOException in case of failure + */ + private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) + throws IOException { + final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); + assumeBlobServiceType(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + fs.setWorkingDirectory(new Path(ROOT_PATH)); + fs.create(new Path(path, "file.txt")); + + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); + + new RenameAtomicity(path, new Path("/hbase/test4"), + renameJson, getTestTracingContext(fs, true), + fileStatus.getEtag(), client) + .preRename(); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should exist.") + .isTrue(); + + return fs; + } + + /** + * Helper method to perform the rename operation and validate the results. + * + * @param fs The AzureBlobFileSystem instance to use for the rename operation. + * @param src The source path (directory). + * @param dst The destination path (directory). + * @param countCall The AtomicInteger to track the number of operations. + * @throws Exception If an error occurs during the rename operation. + */ + private void renameOperationWithRecovery(AzureBlobFileSystem fs, Path src, + Path dst, AtomicInteger countCall) throws Exception { + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(countCall.get()) + .describedAs("Operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + countCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(countCall.get()) + .describedAs("Operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + + /** + * Helper method to assert that the pending JSON file does not exist + * and that the list of file statuses does not contain the rename pending JSON file. + * + * @param fs The AzureBlobFileSystem instance. + * @param renameJson The path of the rename pending JSON file. + * @param fileStatuses The array of FileStatus objects to check. + * @param srcPath The source path to check. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPendingJsonFile(AzureBlobFileSystem fs, + Path renameJson, FileStatus[] fileStatuses, + Path srcPath, boolean isSrcPathExist) throws Exception { + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should not exist.") + .isFalse(); + + Assertions.assertThat( + Arrays.stream(fileStatuses) + .anyMatch(status -> + renameJson.toUri().getPath() + .equals(status.getPath().toUri().getPath()))) + .describedAs( + "List status should not contains any file with suffix -RenamePending.json.") + .isFalse(); + + Assertions.assertThat( + Arrays.stream(fileStatuses) + .anyMatch(status -> + srcPath.toUri().getPath() + .equals(status.getPath().toUri().getPath()))) + .describedAs( + "List status should not contains source path.") + .isEqualTo(isSrcPathExist); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index a6652fe85f1..9af8f0f5a6f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -352,7 +352,7 @@ public static void mockGetRenameBlobHandler(AbfsBlobClient blobClient, Mockito.doAnswer(answer1 -> { functionRaisingIOE.apply(blobRenameHandler); return answer1.callRealMethod(); - }).when(blobRenameHandler).execute(); + }).when(blobRenameHandler).execute(Mockito.anyBoolean()); return blobRenameHandler; }) .when(blobClient) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java index 9faa54e7ba1..d6b11865130 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.fs.azurebfs.utils; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; @@ -25,10 +32,13 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.services.AuthType; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_SECURE_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONTAINER_PREFIX; @@ -40,6 +50,8 @@ public final class AbfsTestUtils extends AbstractAbfsIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(AbfsTestUtils.class); + private static final int TOTAL_THREADS_IN_POOL = 5; + public AbfsTestUtils() throws Exception { super(); } @@ -95,4 +107,29 @@ public static void disableFilesystemCaching(Configuration conf) { conf.setBoolean(String.format("fs.%s.impl.disable.cache", ABFS_SCHEME), true); conf.setBoolean(String.format("fs.%s.impl.disable.cache", ABFS_SECURE_SCHEME), true); } + + /** + * Helper method to create files in the given directory. + * + * @param fs The AzureBlobFileSystem instance to use for file creation. + * @param path The source path (directory). + * @param numFiles The number of files to create. + * @throws ExecutionException, InterruptedException If an error occurs during file creation. + */ + public static void createFiles(AzureBlobFileSystem fs, Path path, int numFiles) + throws ExecutionException, InterruptedException { + ExecutorService executorService = + Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL); + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final int iter = i; + Future future = executorService.submit(() -> + fs.create(new Path(path, FILE + iter + ".txt"))); + futures.add(future); + } + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org