This is an automated email from the ASF dual-hosted git repository. anujmodi pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new c293424de85 HADOOP-19497: [ABFS] Enable rename and create recovery from client transaction id over DFS endpoint (#7509) (#7612) c293424de85 is described below commit c293424de85bd06d685bdc5b2f62ee999c70a799 Author: Manish Bhatt <52626736+bhattmanis...@users.noreply.github.com> AuthorDate: Tue Apr 15 07:42:01 2025 -0700 HADOOP-19497: [ABFS] Enable rename and create recovery from client transaction id over DFS endpoint (#7509) (#7612) Contributed by Manish Bhatt Reiewed by Anmol Asrani, Anuj Modi, Manika Joshi Signed off by: Anuj Modi<anujm...@apache.org> --- .../fs/azurebfs/constants/AbfsHttpConstants.java | 5 +- .../constants/FileSystemConfigurations.java | 2 +- .../hadoop/fs/azurebfs/services/AbfsClient.java | 6 - .../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 445 ++++++++++++++------- .../hadoop/fs/azurebfs/services/AbfsErrors.java | 2 +- .../azurebfs/ITestAzureBlobFileSystemCreate.java | 2 +- .../azurebfs/ITestAzureBlobFileSystemRename.java | 9 +- .../fs/azurebfs/services/TestAbfsClient.java | 9 +- .../services/TestAbfsRenameRetryRecovery.java | 23 +- 9 files changed, 332 insertions(+), 171 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index b2eafd63cb1..e443efa8b83 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -199,13 +199,10 @@ public String toString() { } public static ApiVersion getCurrentVersion() { - return DEC_12_2019; + return NOV_04_2024; } } - @Deprecated - public static final String DECEMBER_2019_API_VERSION = ApiVersion.DEC_12_2019.toString(); - /** * List of Constants Used by Blob Endpoint Rest APIs. */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 6072fcbb6fa..ea982f8a35f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -198,7 +198,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS; - public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = false; + public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true; private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index b1344f2fcdb..c198a423dcb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -218,7 +218,6 @@ private AbfsClient(final URL baseUrl, this.encryptionContextProvider = encryptionContextProvider; // Version update needed to support x-ms-encryption-context header // @link https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id} - xMsVersion = ApiVersion.AUG_03_2023; // will be default once server change deployed encryptionType = EncryptionType.ENCRYPTION_CONTEXT; } else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) { clientProvidedEncryptionKey = @@ -228,11 +227,6 @@ private AbfsClient(final URL baseUrl, encryptionType = EncryptionType.GLOBAL_KEY; } - // Version update needed to support x-ms-client-transaction-id header - if (abfsConfiguration.getIsClientTransactionIdEnabled()) { - xMsVersion = ApiVersion.NOV_04_2024; - } - String sslProviderName = null; if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 0e988162a08..7c1bf83f27f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -678,151 +678,15 @@ public AbfsClientRenameResult renamePath( final TracingContext tracingContext, String sourceEtag, boolean isMetadataIncompleteState) throws IOException { - final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - - final boolean hasEtag = !isEmpty(sourceEtag); - - boolean shouldAttemptRecovery = isRenameResilience() && getIsNamespaceEnabled(); - if (!hasEtag && shouldAttemptRecovery) { - // in case eTag is already not supplied to the API - // and rename resilience is expected and it is an HNS enabled account - // fetch the source etag to be used later in recovery - try { - final AbfsRestOperation srcStatusOp = getPathStatus(source, - false, tracingContext, null); - if (srcStatusOp.hasResult()) { - final AbfsHttpOperation result = srcStatusOp.getResult(); - sourceEtag = extractEtagHeader(result); - // and update the directory status. - boolean isDir = checkIsDir(result); - shouldAttemptRecovery = !isDir; - LOG.debug( - "Retrieved etag of source for rename recovery: {}; isDir={}", - sourceEtag, isDir); - } - } catch (AbfsRestOperationException e) { - throw new AbfsRestOperationException(e.getStatusCode(), - SOURCE_PATH_NOT_FOUND.getErrorCode(), - e.getMessage(), e); - } - - } - - String encodedRenameSource = urlEncode( - FORWARD_SLASH + this.getFileSystem() + source); - if (getAuthType() == AuthType.SAS) { - final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); - appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION, - srcQueryBuilder); - encodedRenameSource += srcQueryBuilder.toString(); - } - - LOG.trace("Rename source queryparam added {}", encodedRenameSource); - requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); - requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); - - // Add the client transaction ID to the request headers. - String clientTransactionId = addClientTransactionIdToHeader(requestHeaders); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); - appendSASTokenToQuery(destination, - SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(destination, - abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); - try { - incrementAbfsRenamePath(); - op.execute(tracingContext); - // AbfsClientResult contains the AbfsOperation, If recovery happened or - // not, and the incompleteMetaDataState is true or false. - // If we successfully rename a path and isMetadataIncompleteState was - // true, then rename was recovered, else it didn't, this is why - // isMetadataIncompleteState is used for renameRecovery(as the 2nd param). - return new AbfsClientRenameResult(op, isMetadataIncompleteState, - isMetadataIncompleteState); - } catch (AzureBlobFileSystemException e) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw e; - } - - // recovery using client transaction id only if it is a retried request. - if (op.isARetriedRequest() && clientTransactionId != null - && SOURCE_PATH_NOT_FOUND.getErrorCode().equalsIgnoreCase( - op.getResult().getStorageErrorCode())) { - try { - final AbfsHttpOperation abfsHttpOperation = - getPathStatus(destination, false, - tracingContext, null).getResult(); - if (clientTransactionId.equals( - abfsHttpOperation.getResponseHeader( - X_MS_CLIENT_TRANSACTION_ID))) { - return new AbfsClientRenameResult( - getSuccessOp(AbfsRestOperationType.RenamePath, - HTTP_METHOD_PUT, url, requestHeaders), true, - isMetadataIncompleteState); - } - } catch (AzureBlobFileSystemException exception) { - throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception); - } - throw e; - } - - // ref: HADOOP-19393. Write permission checks can occur before validating - // rename operation's validity. If there is an existing destination path, it may be rejected - // with an authorization error. Catching and throwing FileAlreadyExistsException instead. - if (op.getResult().getStorageErrorCode() - .equals(UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode())) { - throw new FileAlreadyExistsException(ERR_FILE_ALREADY_EXISTS); - } - - // ref: HADOOP-18242. Rename failure occurring due to a rare case of - // tracking metadata being in incomplete state. - if (op.getResult().getStorageErrorCode() - .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()) - && !isMetadataIncompleteState) { - //Logging - ABFS_METADATA_INCOMPLETE_RENAME_FAILURE - .info( - "Rename Failure attempting to resolve tracking metadata state and retrying."); - // rename recovery should be attempted in this case also - shouldAttemptRecovery = true; - isMetadataIncompleteState = true; - String sourceEtagAfterFailure = sourceEtag; - if (isEmpty(sourceEtagAfterFailure)) { - // Doing a HEAD call resolves the incomplete metadata state and - // then we can retry the rename operation. - AbfsRestOperation sourceStatusOp = getPathStatus(source, false, - tracingContext, null); - isMetadataIncompleteState = true; - // Extract the sourceEtag, using the status Op, and set it - // for future rename recovery. - AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); - sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); - } - renamePath(source, destination, continuation, tracingContext, - sourceEtagAfterFailure, isMetadataIncompleteState); - } - // if we get out of the condition without a successful rename, then - // it isn't metadata incomplete state issue. - isMetadataIncompleteState = false; - - // setting default rename recovery success to false - boolean etagCheckSucceeded = false; - if (shouldAttemptRecovery) { - etagCheckSucceeded = renameIdempotencyCheckOp( - source, - sourceEtag, op, destination, tracingContext); - } - if (!etagCheckSucceeded) { - // idempotency did not return different result - // throw back the exception - throw e; - } - return new AbfsClientRenameResult(op, true, isMetadataIncompleteState); + // Rename with client transaction id if namespace & client transaction id is enabled. + if (getIsNamespaceEnabled() + && getAbfsConfiguration().getIsClientTransactionIdEnabled()) { + return renameWithCTIdRecovery(source, destination, continuation, + tracingContext, sourceEtag, isMetadataIncompleteState); } + // Rename with eTag in any other case. + return renameWithETagRecovery(source, destination, continuation, + tracingContext, sourceEtag, isMetadataIncompleteState); } /** @@ -1696,4 +1560,297 @@ public String addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders } return clientTransactionId; } + + /** + * Attempts to rename a path with client transaction ID (CTId) recovery mechanism. + * If the initial rename attempt fails, it tries to recover using CTId or ETag + * and retries the operation. + * + * @param source the source path to be renamed + * @param destination the destination path for the rename + * @param continuation the continuation token for the operation + * @param tracingContext the context for tracing the operation + * @param sourceEtag the ETag of the source path for conditional requests + * @param isMetadataIncompleteState flag indicating if the metadata state is incomplete + * @return an {@link AbfsClientRenameResult} containing the result of the rename operation + * @throws IOException if an error occurs during the rename operation + */ + private AbfsClientRenameResult renameWithCTIdRecovery(String source, + String destination, String continuation, TracingContext tracingContext, + String sourceEtag, boolean isMetadataIncompleteState) throws IOException { + // Get request headers for rename operation + List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source); + // Add client transaction ID to the request headers + String clientTransactionId = addClientTransactionIdToHeader(requestHeaders); + + // Create the URL for the rename operation + final URL url = createRequestUrl(destination, + getRenameQueryBuilder(destination, continuation).toString()); + + // Create the rename operation + AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); + try { + incrementAbfsRenamePath(); + op.execute(tracingContext); + // If we successfully rename a path and isMetadataIncompleteState is true, + // then the rename was recovered; otherwise, it wasn’t. + // This is why isMetadataIncompleteState is used for renameRecovery (as the second parameter). + return new AbfsClientRenameResult(op, isMetadataIncompleteState, + isMetadataIncompleteState); + } catch (AzureBlobFileSystemException e) { + // Handle rename exceptions and retry if applicable + handleRenameException(source, destination, continuation, + tracingContext, sourceEtag, op, isMetadataIncompleteState, e); + + // Check if the operation is a retried request and if the error code indicates + // that the source path was not found. If so, attempt recovery using CTId. + if (op.isARetriedRequest() + && SOURCE_PATH_NOT_FOUND.getErrorCode() + .equalsIgnoreCase(op.getResult().getStorageErrorCode())) { + if (recoveryUsingCTId(destination, tracingContext, clientTransactionId)) { + return new AbfsClientRenameResult( + getSuccessOp(AbfsRestOperationType.RenamePath, + HTTP_METHOD_PUT, url, requestHeaders), + true, isMetadataIncompleteState); + } + } + + // Attempt recovery using ETag if applicable + if (recoveryUsingEtag(source, destination, sourceEtag, + op, tracingContext, true)) { + return new AbfsClientRenameResult( + getSuccessOp(AbfsRestOperationType.RenamePath, + HTTP_METHOD_PUT, url, requestHeaders), + true, + isMetadataIncompleteState); + } + throw e; + } + } + + /** + * Attempts to recover a rename operation using ETag. If the source ETag is not provided, it attempts + * to fetch it and retry the operation. If recovery fails, it throws the exception. + * + * @param source the source path to be renamed + * @param destination the destination path for the rename + * @param continuation the continuation token for the operation + * @param tracingContext the context for tracing the operation + * @param sourceEtag the ETag of the source path for conditional requests + * @param isMetadataIncompleteState flag indicating if the metadata state is incomplete + * @return an {@link AbfsClientRenameResult} containing the result of the rename operation + * @throws IOException if an error occurs during the rename operation or recovery + */ + private AbfsClientRenameResult renameWithETagRecovery(String source, + String destination, String continuation, + TracingContext tracingContext, String sourceEtag, + boolean isMetadataIncompleteState) throws IOException { + boolean hasEtag = !isEmpty(sourceEtag); + boolean shouldAttemptRecovery = isRenameResilience() && getIsNamespaceEnabled(); + if (!hasEtag && shouldAttemptRecovery) { + // in case eTag is already not supplied to the API + // and rename resilience is expected and it is an HNS enabled account + // fetch the source etag to be used later in recovery + try { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext, null); + if (srcStatusOp.hasResult()) { + final AbfsHttpOperation result = srcStatusOp.getResult(); + sourceEtag = extractEtagHeader(result); + // and update the directory status. + boolean isDir = checkIsDir(result); + shouldAttemptRecovery = !isDir; + LOG.debug( + "Retrieved etag of source for rename recovery: {}; isDir={}", + sourceEtag, isDir); + } + } catch (AbfsRestOperationException e) { + throw new AbfsRestOperationException(e.getStatusCode(), + SOURCE_PATH_NOT_FOUND.getErrorCode(), e.getMessage(), e); + } + } + + // Get request headers for rename operation + List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source); + + // Create the URL for the rename operation + final URL url = createRequestUrl(destination, + getRenameQueryBuilder(destination, continuation).toString()); + + // Create the rename operation + AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); + try { + incrementAbfsRenamePath(); + op.execute(tracingContext); + // If we successfully rename a path and isMetadataIncompleteState is true, + // then the rename was recovered; otherwise, it wasn’t. + // This is why isMetadataIncompleteState is used for renameRecovery (as the second parameter). + return new AbfsClientRenameResult(op, isMetadataIncompleteState, + isMetadataIncompleteState); + } catch (AzureBlobFileSystemException e) { + // Handle rename exceptions and retry if applicable + handleRenameException(source, destination, continuation, + tracingContext, sourceEtag, op, isMetadataIncompleteState, e); + + // Attempt recovery using ETag if applicable + if (recoveryUsingEtag(source, destination, sourceEtag, + op, tracingContext, shouldAttemptRecovery)) { + return new AbfsClientRenameResult( + getSuccessOp(AbfsRestOperationType.RenamePath, + HTTP_METHOD_PUT, url, requestHeaders), + true, isMetadataIncompleteState); + } + throw e; + } + } + + /** + * Creates a list of HTTP headers required for a rename operation, including the encoded source path + * and SAS token if applicable. + * + * @param source the source path for the rename operation + * @return a list of {@link AbfsHttpHeader} containing the headers for the rename request + * @throws IOException if an error occurs while creating the headers or encoding the source path + */ + private List<AbfsHttpHeader> getHeadersForRename(final String source) + throws IOException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + String encodedRenameSource = urlEncode( + FORWARD_SLASH + this.getFileSystem() + source); + + if (getAuthType() == AuthType.SAS) { + final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); + appendSASTokenToQuery(source, + SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder); + encodedRenameSource += srcQueryBuilder.toString(); + } + + LOG.trace("Rename source queryparam added {}", encodedRenameSource); + requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + return requestHeaders; + } + + /** + * Builds a query builder for the rename operation URL, including the continuation token and SAS token + * for the destination path. + * + * @param destination the destination path for the rename operation + * @param continuation the continuation token for the operation + * @return an {@link AbfsUriQueryBuilder} containing the query parameters for the rename operation + * @throws AzureBlobFileSystemException if an error occurs while appending the SAS token + */ + private AbfsUriQueryBuilder getRenameQueryBuilder(final String destination, + final String continuation) throws AzureBlobFileSystemException { + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + appendSASTokenToQuery(destination, + SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); + return abfsUriQueryBuilder; + } + + /** + * Attempts to recover a rename operation using the client transaction ID (CTId). + * It checks if the provided CTId matches the one in the response header for the destination path. + * + * @param destination the destination path for the rename operation + * @param tracingContext the context for tracing the operation + * @param clientTransactionId the client transaction ID to be used for recovery + * @return true if the client transaction ID matches, indicating recovery can proceed; false otherwise + * @throws AzureBlobFileSystemException if an error occurs while retrieving the path status + */ + private boolean recoveryUsingCTId(String destination, + TracingContext tracingContext, String clientTransactionId) + throws AzureBlobFileSystemException { + try { + final AbfsHttpOperation abfsHttpOperation = + getPathStatus(destination, false, + tracingContext, null).getResult(); + return clientTransactionId.equals( + abfsHttpOperation.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)); + } catch (AzureBlobFileSystemException exception) { + throw new AbfsDriverException(ERR_RENAME_RECOVERY + destination, exception); + } + } + + /** + * Attempts recovery using an ETag for the given source and destination. + * If recovery is enabled and rename resilience is supported, performs an idempotency check + * for the rename operation. + * + * @param source the source path to be renamed + * @param destination the destination path for the rename + * @param sourceEtag the ETag of the source path for conditional requests + * @param op the AbfsRestOperation object for the rename operation + * @param tracingContext the context for tracing the operation + * @param shouldAttemptRecovery flag indicating whether recovery should be attempted + * @return true if the recovery attempt was successful, false otherwise + */ + private boolean recoveryUsingEtag(String source, String destination, + String sourceEtag, AbfsRestOperation op, TracingContext tracingContext, + boolean shouldAttemptRecovery) { + if (shouldAttemptRecovery && isRenameResilience()) { + return renameIdempotencyCheckOp(source, sourceEtag, + op, destination, tracingContext); + } + return false; + } + + /** + * Checks for rename operation exceptions and handles them accordingly. + * Throws an exception or retries the operation if certain error conditions are met, + * such as unauthorized overwrite or missing destination parent path. + * + * @param source The source path for the rename operation. + * @param destination The destination path for the rename operation. + * @param continuation Continuation token for the operation, if applicable. + * @param tracingContext The tracing context for tracking the operation. + * @param sourceEtag The ETag of the source path for metadata validation. + * @param op The ABFS operation result for the rename attempt. + * @param isMetadataIncompleteState Flag indicating if metadata is incomplete. + * @throws IOException If an I/O error occurs during the rename operation. + * @throws FileAlreadyExistsException If the destination file already exists. + */ + private void handleRenameException(final String source, + final String destination, final String continuation, + final TracingContext tracingContext, final String sourceEtag, + final AbfsRestOperation op, boolean isMetadataIncompleteState, + AzureBlobFileSystemException e) throws IOException { + if (!op.hasResult()) { + throw e; + } + + // ref: HADOOP-19393. Write permission checks can occur before validating + // rename operation's validity. If there is an existing destination path, it may be rejected + // with an authorization error. Catching and throwing FileAlreadyExistsException instead. + if (UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode() + .equals(op.getResult().getStorageErrorCode())) { + throw new FileAlreadyExistsException(ERR_FILE_ALREADY_EXISTS); + } + + // ref: HADOOP-18242. Rename failure occurring due to a rare case of + // tracking metadata being in incomplete state. + if (RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode() + .equals(op.getResult().getStorageErrorCode()) + && !isMetadataIncompleteState) { + ABFS_METADATA_INCOMPLETE_RENAME_FAILURE.info( + "Rename Failure attempting to resolve tracking metadata state and retrying."); + isMetadataIncompleteState = true; + String sourceEtagAfterFailure = sourceEtag; + if (isEmpty(sourceEtagAfterFailure)) { + // Doing a HEAD call resolves the incomplete metadata state and + // then we can retry the rename operation. + AbfsRestOperation sourceStatusOp = getPathStatus(source, false, + tracingContext, null); + // Extract the sourceEtag, using the status Op, and set it + // for future rename recovery. + AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); + sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); + } + + // Retry the rename operation with the updated sourceEtag and isMetadataIncompleteState. + renamePath(source, destination, continuation, + tracingContext, sourceEtagAfterFailure, isMetadataIncompleteState); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java index da761f7b4e0..6ee582c5a21 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -72,7 +72,7 @@ public final class AbfsErrors { public static final String ERR_CREATE_RECOVERY = "Error while recovering from create failure."; public static final String ERR_RENAME_RECOVERY = - "Error while recovering from rename failure."; + "Error while recovering from rename failure for path: "; public static final String ERR_BLOB_LIST_PARSING = "Parsing of XML List Response Failed in BlobClient."; public static final String ERR_DFS_LIST_PARSING = "Parsing of Json List Response Failed in DfsClient."; public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem."; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 4b08e7d2246..b719a3217b2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -2289,6 +2289,6 @@ public void answer(final AbfsRestOperation mockedObj, null, op); } } - }); + }, 0); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index fa9b63c0ef9..e2f8b679fc5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2673,12 +2673,13 @@ public void testFailureInGetPathStatusDuringRenameRecovery() throws Exception { final String[] clientTransactionId = new String[1]; mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId); mockRetriedRequest(abfsDfsClient, new ArrayList<>()); - boolean[] flag = new boolean[1]; + int[] flag = new int[1]; Mockito.doAnswer(getPathStatus -> { - if (!flag[0]) { - flag[0] = true; + if (flag[0] == 1) { + flag[0] += 1; throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception()); } + flag[0] += 1; return getPathStatus.callRealMethod(); }).when(abfsDfsClient).getPathStatus( Mockito.nullable(String.class), Mockito.nullable(Boolean.class), @@ -2737,6 +2738,6 @@ public void answer(final AbfsRestOperation mockedObj, SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op); } } - }); + }, 1); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 274230e4b38..2f433b2b400 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -152,11 +152,11 @@ private boolean isThreadRunning(String threadName) { * @throws Exception if an error occurs while mocking the operation creation */ public static void mockAbfsOperationCreation(final AbfsClient abfsClient, - final MockIntercept mockIntercept) throws Exception { - boolean[] flag = new boolean[1]; + final MockIntercept mockIntercept, int failedCall) throws Exception { + int[] flag = new int[1]; Mockito.doAnswer(answer -> { - if (!flag[0]) { - flag[0] = true; + if (flag[0] == failedCall) { + flag[0] += 1; AbfsRestOperation op = Mockito.spy( new AbfsRestOperation( answer.getArgument(0), @@ -174,6 +174,7 @@ public static void mockAbfsOperationCreation(final AbfsClient abfsClient, Mockito.doReturn(true).when(op).isARetriedRequest(); return op; } + flag[0] += 1; return answer.callRealMethod(); }).when(abfsClient) .getAbfsRestOperation(any(), any(), any(), any()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index b040a4b12f9..c859be5fd0d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -58,7 +59,9 @@ import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -180,6 +183,12 @@ AbfsClient getMockAbfsClient() throws IOException { .when(spyClient) .createRenameRestOperation(Mockito.any(URL.class), anyList()); + Mockito.doCallRealMethod() + .when(spyClient) + .getPathStatus(anyString(), anyBoolean(), + Mockito.any(TracingContext.class), + Mockito.any(ContextEncryptionAdapter.class)); + return spyClient; } @@ -275,9 +284,14 @@ public void testRenameRecoveryEtagMatchFsLevel() throws IOException { // 4 calls should have happened in total for rename // 1 -> original rename rest call, 2 -> first retry, // +2 for getPathStatus calls + int totalConnections = 4; + if (!getConfiguration().getIsClientTransactionIdEnabled()) { + // 1 additional getPathStatus call to get dest etag + totalConnections++; + } assertThatStatisticCounter(ioStats, CONNECTIONS_MADE.getStatName()) - .isEqualTo(5 + connMadeBeforeRename); + .isEqualTo(totalConnections + connMadeBeforeRename); // the RENAME_PATH_ATTEMPTS stat should be incremented by 1 // retries happen internally within AbfsRestOperation execute() // the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called @@ -350,21 +364,18 @@ public void testRenameRecoveryFailsForDirFsLevel() throws Exception { if (getConfiguration().getIsClientTransactionIdEnabled()) { // Recovery based on client transaction id should be successful assertTrue(renameResult); - // One extra getPathStatus call should have happened - newConnections = 5; } else { assertFalse(renameResult); - newConnections = 4; } // validating stat counters after rename - // 3 calls should have happened in total for rename + // 4 calls should have happened in total for rename // 1 -> original rename rest call, 2 -> first retry, // +1 for getPathStatus calls // last getPathStatus call should be skipped assertThatStatisticCounter(ioStats, CONNECTIONS_MADE.getStatName()) - .isEqualTo(newConnections + connMadeBeforeRename); + .isEqualTo(4 + connMadeBeforeRename); // the RENAME_PATH_ATTEMPTS stat should be incremented by 1 // retries happen internally within AbfsRestOperation execute() --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org