[ 
https://issues.apache.org/jira/browse/HADOOP-19497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939904#comment-17939904
 ] 

ASF GitHub Bot commented on HADOOP-19497:
-----------------------------------------

bhattmanish98 commented on code in PR #7509:
URL: https://github.com/apache/hadoop/pull/7509#discussion_r2022158193


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java:
##########
@@ -1664,4 +1528,301 @@ public String 
addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders
     }
     return clientTransactionId;
   }
+
+  /**
+   * Attempts to rename a path with client transaction ID (CTId) recovery 
mechanism.
+   * If the initial rename attempt fails, it tries to recover using CTId or 
ETag
+   * and retries the operation.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param continuation the continuation token for the operation
+   * @param tracingContext the context for tracing the operation
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param isMetadataIncompleteState flag indicating if the metadata state is 
incomplete
+   * @return an {@link AbfsClientRenameResult} containing the result of the 
rename operation
+   * @throws IOException if an error occurs during the rename operation
+   */
+  private AbfsClientRenameResult renameWithCTIdRecovery(String source,
+      String destination, String continuation, TracingContext tracingContext,
+      String sourceEtag, boolean isMetadataIncompleteState) throws IOException 
{
+    // Get request headers for rename operation
+    List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+    // Add client transaction ID to the request headers
+    String clientTransactionId = 
addClientTransactionIdToHeader(requestHeaders);
+
+    // Create the URL for the rename operation
+    final URL url = createRequestUrl(destination,
+        getRenameQueryBuilder(destination, continuation).toString());
+
+    // Create the rename operation
+    AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+    try {
+      incrementAbfsRenamePath();
+      op.execute(tracingContext);
+      // AbfsClientResult contains the AbfsOperation, If recovery happened or
+      // not, and the incompleteMetaDataState is true or false.
+      // If we successfully rename a path and isMetadataIncompleteState was
+      // true, then rename was recovered, else it didn't, this is why
+      // isMetadataIncompleteState is used for renameRecovery(as the 2nd 
param).
+      return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+          isMetadataIncompleteState);
+    } catch (AzureBlobFileSystemException e) {
+      // Handle rename exceptions and retry if applicable
+      handleRenameException(source, destination, continuation,
+          tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+      // Check if the operation is a retried request and if the error code 
indicates
+      // that the source path was not found. If so, attempt recovery using 
CTId.
+      if (op.isARetriedRequest()
+          && SOURCE_PATH_NOT_FOUND.getErrorCode()
+          .equalsIgnoreCase(op.getResult().getStorageErrorCode())) {
+        if (recoveryUsingCTId(destination, tracingContext, 
clientTransactionId)) {
+          return new AbfsClientRenameResult(
+              getSuccessOp(AbfsRestOperationType.RenamePath,
+                  HTTP_METHOD_PUT, url, requestHeaders),
+              true, isMetadataIncompleteState);
+        }
+      }
+
+      // Attempt recovery using ETag if applicable
+      if (recoveryUsingEtag(source, destination, sourceEtag,
+          op, tracingContext, true)) {
+        return new AbfsClientRenameResult(
+            getSuccessOp(AbfsRestOperationType.RenamePath,
+                HTTP_METHOD_PUT, url, requestHeaders),
+            true,
+            isMetadataIncompleteState);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Attempts to recover a rename operation using ETag. If the source ETag is 
not provided, it attempts
+   * to fetch it and retry the operation. If recovery fails, it throws the 
exception.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param continuation the continuation token for the operation
+   * @param tracingContext the context for tracing the operation
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param isMetadataIncompleteState flag indicating if the metadata state is 
incomplete
+   * @return an {@link AbfsClientRenameResult} containing the result of the 
rename operation
+   * @throws IOException if an error occurs during the rename operation or 
recovery
+   */
+  private AbfsClientRenameResult renameWithETagRecovery(String source,
+      String destination, String continuation,
+      TracingContext tracingContext, String sourceEtag,
+      boolean isMetadataIncompleteState) throws IOException {
+    boolean hasEtag = !isEmpty(sourceEtag);
+    boolean shouldAttemptRecovery = isRenameResilience() && 
getIsNamespaceEnabled();
+    if (!hasEtag && shouldAttemptRecovery) {
+      // in case eTag is already not supplied to the API
+      // and rename resilience is expected and it is an HNS enabled account
+      // fetch the source etag to be used later in recovery
+      try {
+        final AbfsRestOperation srcStatusOp = getPathStatus(source,
+            false, tracingContext, null);
+        if (srcStatusOp.hasResult()) {
+          final AbfsHttpOperation result = srcStatusOp.getResult();
+          sourceEtag = extractEtagHeader(result);
+          // and update the directory status.
+          boolean isDir = checkIsDir(result);
+          shouldAttemptRecovery = !isDir;
+          LOG.debug(
+              "Retrieved etag of source for rename recovery: {}; isDir={}",
+              sourceEtag, isDir);
+        }
+      } catch (AbfsRestOperationException e) {
+        throw new AbfsRestOperationException(e.getStatusCode(),
+            SOURCE_PATH_NOT_FOUND.getErrorCode(), e.getMessage(), e);
+      }
+    }
+
+    // Get request headers for rename operation
+    List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+
+    // Create the URL for the rename operation
+    final URL url = createRequestUrl(destination,
+        getRenameQueryBuilder(destination, continuation).toString());
+
+    // Create the rename operation
+    AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+    try {
+      incrementAbfsRenamePath();
+      op.execute(tracingContext);
+      // AbfsClientResult contains the AbfsOperation, If recovery happened or
+      // not, and the incompleteMetaDataState is true or false.
+      // If we successfully rename a path and isMetadataIncompleteState was
+      // true, then rename was recovered, else it didn't, this is why
+      // isMetadataIncompleteState is used for renameRecovery(as the 2nd 
param).
+      return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+          isMetadataIncompleteState);
+    } catch (AzureBlobFileSystemException e) {
+      // Handle rename exceptions and retry if applicable
+      handleRenameException(source, destination, continuation,
+          tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+      // Attempt recovery using ETag if applicable
+      if (recoveryUsingEtag(source, destination, sourceEtag,
+          op, tracingContext, shouldAttemptRecovery)) {
+        return new AbfsClientRenameResult(
+            getSuccessOp(AbfsRestOperationType.RenamePath,
+                HTTP_METHOD_PUT, url, requestHeaders),
+            true, isMetadataIncompleteState);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Creates a list of HTTP headers required for a rename operation, including 
the encoded source path
+   * and SAS token if applicable.
+   *
+   * @param source the source path for the rename operation
+   * @return a list of {@link AbfsHttpHeader} containing the headers for the 
rename request
+   * @throws IOException if an error occurs while creating the headers or 
encoding the source path
+   */
+  private List<AbfsHttpHeader> getHeadersForRename(final String source)
+      throws IOException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    String encodedRenameSource = urlEncode(
+        FORWARD_SLASH + this.getFileSystem() + source);
+
+    if (getAuthType() == AuthType.SAS) {
+      final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
+      appendSASTokenToQuery(source,
+          SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder);
+      encodedRenameSource += srcQueryBuilder.toString();
+    }
+
+    LOG.trace("Rename source queryparam added {}", encodedRenameSource);
+    requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, 
encodedRenameSource));
+    requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+    return requestHeaders;
+  }
+
+  /**
+   * Builds a query builder for the rename operation URL, including the 
continuation token and SAS token
+   * for the destination path.
+   *
+   * @param destination the destination path for the rename operation
+   * @param continuation the continuation token for the operation
+   * @return an {@link AbfsUriQueryBuilder} containing the query parameters 
for the rename operation
+   * @throws AzureBlobFileSystemException if an error occurs while appending 
the SAS token
+   */
+  private AbfsUriQueryBuilder getRenameQueryBuilder(final String destination,
+      final String continuation) throws AzureBlobFileSystemException {
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+    appendSASTokenToQuery(destination,
+        SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
+    return abfsUriQueryBuilder;
+  }
+
+  /**
+   * Attempts to recover a rename operation using the client transaction ID 
(CTId).
+   * It checks if the provided CTId matches the one in the response header for 
the destination path.
+   *
+   * @param destination the destination path for the rename operation
+   * @param tracingContext the context for tracing the operation
+   * @param clientTransactionId the client transaction ID to be used for 
recovery
+   * @return true if the client transaction ID matches, indicating recovery 
can proceed; false otherwise
+   * @throws AzureBlobFileSystemException if an error occurs while retrieving 
the path status
+   */
+  private boolean recoveryUsingCTId(String destination,
+      TracingContext tracingContext, String clientTransactionId)
+      throws AzureBlobFileSystemException {
+    try {
+      final AbfsHttpOperation abfsHttpOperation =
+          getPathStatus(destination, false,
+              tracingContext, null).getResult();
+      return clientTransactionId.equals(
+          abfsHttpOperation.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID));
+    } catch (AzureBlobFileSystemException exception) {
+      throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception);

Review Comment:
   Taken!





> [ABFS] Enable rename and create recovery from client transaction id over DFS 
> endpoint
> -------------------------------------------------------------------------------------
>
>                 Key: HADOOP-19497
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19497
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0
>            Reporter: Manish Bhatt
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>
> We have implemented create and rename recovery using client transaction IDs 
> over the DFS endpoint ([HADOOP-19450] [ABFS] Rename/Create path idempotency 
> client-level resolution - ASF JIRA). Since the backend changes were not fully 
> rolled out, we initially implemented the changes with the flag disabled. With 
> this update, we aim to enable the flag, which will start sending client 
> transaction IDs. In case of a failure, we will attempt to recover from the 
> failed state if possible. Here are the detailed steps and considerations for 
> this process:
> 1. **Implementation Overview**: We introduced a mechanism for create and 
> rename recovery via client transaction IDs to enhance reliability and data 
> integrity over the DFS endpoint. This change was initially flagged as 
> disabled due to incomplete backend rollouts.
> 2. **Current Update**: With the backend changes now in place, we are ready to 
> enable the flag. This will activate the sending of client transaction IDs, 
> allowing us to track and manage transactions more effectively.
> 3. **Failure Recovery**: The primary advantage of enabling this flag is the 
> potential for recovery from failed states. If a transaction fails, we can use 
> the client transaction ID to attempt a recovery, minimizing data loss and 
> ensuring continuity.
> 4. **Next Steps**: We will proceed with enabling the flag and closely monitor 
> the system's performance. Any issues or failures will be documented and 
> addressed promptly to ensure a smooth transition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to