[ https://issues.apache.org/jira/browse/HADOOP-19497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939904#comment-17939904 ]
ASF GitHub Bot commented on HADOOP-19497: ----------------------------------------- bhattmanish98 commented on code in PR #7509: URL: https://github.com/apache/hadoop/pull/7509#discussion_r2022158193 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java: ########## @@ -1664,4 +1528,301 @@ public String addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders } return clientTransactionId; } + + /** + * Attempts to rename a path with client transaction ID (CTId) recovery mechanism. + * If the initial rename attempt fails, it tries to recover using CTId or ETag + * and retries the operation. + * + * @param source the source path to be renamed + * @param destination the destination path for the rename + * @param continuation the continuation token for the operation + * @param tracingContext the context for tracing the operation + * @param sourceEtag the ETag of the source path for conditional requests + * @param isMetadataIncompleteState flag indicating if the metadata state is incomplete + * @return an {@link AbfsClientRenameResult} containing the result of the rename operation + * @throws IOException if an error occurs during the rename operation + */ + private AbfsClientRenameResult renameWithCTIdRecovery(String source, + String destination, String continuation, TracingContext tracingContext, + String sourceEtag, boolean isMetadataIncompleteState) throws IOException { + // Get request headers for rename operation + List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source); + // Add client transaction ID to the request headers + String clientTransactionId = addClientTransactionIdToHeader(requestHeaders); + + // Create the URL for the rename operation + final URL url = createRequestUrl(destination, + getRenameQueryBuilder(destination, continuation).toString()); + + // Create the rename operation + AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); + try { + incrementAbfsRenamePath(); + op.execute(tracingContext); + // AbfsClientResult contains the AbfsOperation, If recovery happened or + // not, and the incompleteMetaDataState is true or false. + // If we successfully rename a path and isMetadataIncompleteState was + // true, then rename was recovered, else it didn't, this is why + // isMetadataIncompleteState is used for renameRecovery(as the 2nd param). + return new AbfsClientRenameResult(op, isMetadataIncompleteState, + isMetadataIncompleteState); + } catch (AzureBlobFileSystemException e) { + // Handle rename exceptions and retry if applicable + handleRenameException(source, destination, continuation, + tracingContext, sourceEtag, op, isMetadataIncompleteState, e); + + // Check if the operation is a retried request and if the error code indicates + // that the source path was not found. If so, attempt recovery using CTId. + if (op.isARetriedRequest() + && SOURCE_PATH_NOT_FOUND.getErrorCode() + .equalsIgnoreCase(op.getResult().getStorageErrorCode())) { + if (recoveryUsingCTId(destination, tracingContext, clientTransactionId)) { + return new AbfsClientRenameResult( + getSuccessOp(AbfsRestOperationType.RenamePath, + HTTP_METHOD_PUT, url, requestHeaders), + true, isMetadataIncompleteState); + } + } + + // Attempt recovery using ETag if applicable + if (recoveryUsingEtag(source, destination, sourceEtag, + op, tracingContext, true)) { + return new AbfsClientRenameResult( + getSuccessOp(AbfsRestOperationType.RenamePath, + HTTP_METHOD_PUT, url, requestHeaders), + true, + isMetadataIncompleteState); + } + throw e; + } + } + + /** + * Attempts to recover a rename operation using ETag. If the source ETag is not provided, it attempts + * to fetch it and retry the operation. If recovery fails, it throws the exception. + * + * @param source the source path to be renamed + * @param destination the destination path for the rename + * @param continuation the continuation token for the operation + * @param tracingContext the context for tracing the operation + * @param sourceEtag the ETag of the source path for conditional requests + * @param isMetadataIncompleteState flag indicating if the metadata state is incomplete + * @return an {@link AbfsClientRenameResult} containing the result of the rename operation + * @throws IOException if an error occurs during the rename operation or recovery + */ + private AbfsClientRenameResult renameWithETagRecovery(String source, + String destination, String continuation, + TracingContext tracingContext, String sourceEtag, + boolean isMetadataIncompleteState) throws IOException { + boolean hasEtag = !isEmpty(sourceEtag); + boolean shouldAttemptRecovery = isRenameResilience() && getIsNamespaceEnabled(); + if (!hasEtag && shouldAttemptRecovery) { + // in case eTag is already not supplied to the API + // and rename resilience is expected and it is an HNS enabled account + // fetch the source etag to be used later in recovery + try { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext, null); + if (srcStatusOp.hasResult()) { + final AbfsHttpOperation result = srcStatusOp.getResult(); + sourceEtag = extractEtagHeader(result); + // and update the directory status. + boolean isDir = checkIsDir(result); + shouldAttemptRecovery = !isDir; + LOG.debug( + "Retrieved etag of source for rename recovery: {}; isDir={}", + sourceEtag, isDir); + } + } catch (AbfsRestOperationException e) { + throw new AbfsRestOperationException(e.getStatusCode(), + SOURCE_PATH_NOT_FOUND.getErrorCode(), e.getMessage(), e); + } + } + + // Get request headers for rename operation + List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source); + + // Create the URL for the rename operation + final URL url = createRequestUrl(destination, + getRenameQueryBuilder(destination, continuation).toString()); + + // Create the rename operation + AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); + try { + incrementAbfsRenamePath(); + op.execute(tracingContext); + // AbfsClientResult contains the AbfsOperation, If recovery happened or + // not, and the incompleteMetaDataState is true or false. + // If we successfully rename a path and isMetadataIncompleteState was + // true, then rename was recovered, else it didn't, this is why + // isMetadataIncompleteState is used for renameRecovery(as the 2nd param). + return new AbfsClientRenameResult(op, isMetadataIncompleteState, + isMetadataIncompleteState); + } catch (AzureBlobFileSystemException e) { + // Handle rename exceptions and retry if applicable + handleRenameException(source, destination, continuation, + tracingContext, sourceEtag, op, isMetadataIncompleteState, e); + + // Attempt recovery using ETag if applicable + if (recoveryUsingEtag(source, destination, sourceEtag, + op, tracingContext, shouldAttemptRecovery)) { + return new AbfsClientRenameResult( + getSuccessOp(AbfsRestOperationType.RenamePath, + HTTP_METHOD_PUT, url, requestHeaders), + true, isMetadataIncompleteState); + } + throw e; + } + } + + /** + * Creates a list of HTTP headers required for a rename operation, including the encoded source path + * and SAS token if applicable. + * + * @param source the source path for the rename operation + * @return a list of {@link AbfsHttpHeader} containing the headers for the rename request + * @throws IOException if an error occurs while creating the headers or encoding the source path + */ + private List<AbfsHttpHeader> getHeadersForRename(final String source) + throws IOException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + String encodedRenameSource = urlEncode( + FORWARD_SLASH + this.getFileSystem() + source); + + if (getAuthType() == AuthType.SAS) { + final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); + appendSASTokenToQuery(source, + SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder); + encodedRenameSource += srcQueryBuilder.toString(); + } + + LOG.trace("Rename source queryparam added {}", encodedRenameSource); + requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + return requestHeaders; + } + + /** + * Builds a query builder for the rename operation URL, including the continuation token and SAS token + * for the destination path. + * + * @param destination the destination path for the rename operation + * @param continuation the continuation token for the operation + * @return an {@link AbfsUriQueryBuilder} containing the query parameters for the rename operation + * @throws AzureBlobFileSystemException if an error occurs while appending the SAS token + */ + private AbfsUriQueryBuilder getRenameQueryBuilder(final String destination, + final String continuation) throws AzureBlobFileSystemException { + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + appendSASTokenToQuery(destination, + SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); + return abfsUriQueryBuilder; + } + + /** + * Attempts to recover a rename operation using the client transaction ID (CTId). + * It checks if the provided CTId matches the one in the response header for the destination path. + * + * @param destination the destination path for the rename operation + * @param tracingContext the context for tracing the operation + * @param clientTransactionId the client transaction ID to be used for recovery + * @return true if the client transaction ID matches, indicating recovery can proceed; false otherwise + * @throws AzureBlobFileSystemException if an error occurs while retrieving the path status + */ + private boolean recoveryUsingCTId(String destination, + TracingContext tracingContext, String clientTransactionId) + throws AzureBlobFileSystemException { + try { + final AbfsHttpOperation abfsHttpOperation = + getPathStatus(destination, false, + tracingContext, null).getResult(); + return clientTransactionId.equals( + abfsHttpOperation.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)); + } catch (AzureBlobFileSystemException exception) { + throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception); Review Comment: Taken! > [ABFS] Enable rename and create recovery from client transaction id over DFS > endpoint > ------------------------------------------------------------------------------------- > > Key: HADOOP-19497 > URL: https://issues.apache.org/jira/browse/HADOOP-19497 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.5.0 > Reporter: Manish Bhatt > Assignee: Manish Bhatt > Priority: Major > Labels: pull-request-available > > We have implemented create and rename recovery using client transaction IDs > over the DFS endpoint ([HADOOP-19450] [ABFS] Rename/Create path idempotency > client-level resolution - ASF JIRA). Since the backend changes were not fully > rolled out, we initially implemented the changes with the flag disabled. With > this update, we aim to enable the flag, which will start sending client > transaction IDs. In case of a failure, we will attempt to recover from the > failed state if possible. Here are the detailed steps and considerations for > this process: > 1. **Implementation Overview**: We introduced a mechanism for create and > rename recovery via client transaction IDs to enhance reliability and data > integrity over the DFS endpoint. This change was initially flagged as > disabled due to incomplete backend rollouts. > 2. **Current Update**: With the backend changes now in place, we are ready to > enable the flag. This will activate the sending of client transaction IDs, > allowing us to track and manage transactions more effectively. > 3. **Failure Recovery**: The primary advantage of enabling this flag is the > potential for recovery from failed states. If a transaction fails, we can use > the client transaction ID to attempt a recovery, minimizing data loss and > ensuring continuity. > 4. **Next Steps**: We will proceed with enabling the flag and closely monitor > the system's performance. Any issues or failures will be documented and > addressed promptly to ensure a smooth transition. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org