bhattmanish98 commented on code in PR #7509:
URL: https://github.com/apache/hadoop/pull/7509#discussion_r2022205902


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java:
##########
@@ -1664,4 +1528,301 @@ public String 
addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders
     }
     return clientTransactionId;
   }
+
+  /**
+   * Attempts to rename a path with client transaction ID (CTId) recovery 
mechanism.
+   * If the initial rename attempt fails, it tries to recover using CTId or 
ETag
+   * and retries the operation.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param continuation the continuation token for the operation
+   * @param tracingContext the context for tracing the operation
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param isMetadataIncompleteState flag indicating if the metadata state is 
incomplete
+   * @return an {@link AbfsClientRenameResult} containing the result of the 
rename operation
+   * @throws IOException if an error occurs during the rename operation
+   */
+  private AbfsClientRenameResult renameWithCTIdRecovery(String source,
+      String destination, String continuation, TracingContext tracingContext,
+      String sourceEtag, boolean isMetadataIncompleteState) throws IOException 
{
+    // Get request headers for rename operation
+    List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+    // Add client transaction ID to the request headers
+    String clientTransactionId = 
addClientTransactionIdToHeader(requestHeaders);
+
+    // Create the URL for the rename operation
+    final URL url = createRequestUrl(destination,
+        getRenameQueryBuilder(destination, continuation).toString());
+
+    // Create the rename operation
+    AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+    try {
+      incrementAbfsRenamePath();
+      op.execute(tracingContext);
+      // AbfsClientResult contains the AbfsOperation, If recovery happened or
+      // not, and the incompleteMetaDataState is true or false.
+      // If we successfully rename a path and isMetadataIncompleteState was
+      // true, then rename was recovered, else it didn't, this is why
+      // isMetadataIncompleteState is used for renameRecovery(as the 2nd 
param).
+      return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+          isMetadataIncompleteState);
+    } catch (AzureBlobFileSystemException e) {
+      // Handle rename exceptions and retry if applicable
+      handleRenameException(source, destination, continuation,
+          tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+      // Check if the operation is a retried request and if the error code 
indicates
+      // that the source path was not found. If so, attempt recovery using 
CTId.
+      if (op.isARetriedRequest()
+          && SOURCE_PATH_NOT_FOUND.getErrorCode()
+          .equalsIgnoreCase(op.getResult().getStorageErrorCode())) {
+        if (recoveryUsingCTId(destination, tracingContext, 
clientTransactionId)) {
+          return new AbfsClientRenameResult(
+              getSuccessOp(AbfsRestOperationType.RenamePath,
+                  HTTP_METHOD_PUT, url, requestHeaders),
+              true, isMetadataIncompleteState);
+        }
+      }
+
+      // Attempt recovery using ETag if applicable
+      if (recoveryUsingEtag(source, destination, sourceEtag,
+          op, tracingContext, true)) {
+        return new AbfsClientRenameResult(
+            getSuccessOp(AbfsRestOperationType.RenamePath,
+                HTTP_METHOD_PUT, url, requestHeaders),
+            true,
+            isMetadataIncompleteState);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Attempts to recover a rename operation using ETag. If the source ETag is 
not provided, it attempts
+   * to fetch it and retry the operation. If recovery fails, it throws the 
exception.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param continuation the continuation token for the operation
+   * @param tracingContext the context for tracing the operation
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param isMetadataIncompleteState flag indicating if the metadata state is 
incomplete
+   * @return an {@link AbfsClientRenameResult} containing the result of the 
rename operation
+   * @throws IOException if an error occurs during the rename operation or 
recovery
+   */
+  private AbfsClientRenameResult renameWithETagRecovery(String source,
+      String destination, String continuation,
+      TracingContext tracingContext, String sourceEtag,
+      boolean isMetadataIncompleteState) throws IOException {
+    boolean hasEtag = !isEmpty(sourceEtag);
+    boolean shouldAttemptRecovery = isRenameResilience() && 
getIsNamespaceEnabled();
+    if (!hasEtag && shouldAttemptRecovery) {
+      // in case eTag is already not supplied to the API
+      // and rename resilience is expected and it is an HNS enabled account
+      // fetch the source etag to be used later in recovery
+      try {
+        final AbfsRestOperation srcStatusOp = getPathStatus(source,
+            false, tracingContext, null);
+        if (srcStatusOp.hasResult()) {
+          final AbfsHttpOperation result = srcStatusOp.getResult();
+          sourceEtag = extractEtagHeader(result);
+          // and update the directory status.
+          boolean isDir = checkIsDir(result);
+          shouldAttemptRecovery = !isDir;
+          LOG.debug(
+              "Retrieved etag of source for rename recovery: {}; isDir={}",
+              sourceEtag, isDir);
+        }
+      } catch (AbfsRestOperationException e) {
+        throw new AbfsRestOperationException(e.getStatusCode(),
+            SOURCE_PATH_NOT_FOUND.getErrorCode(), e.getMessage(), e);
+      }
+    }
+
+    // Get request headers for rename operation
+    List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+
+    // Create the URL for the rename operation
+    final URL url = createRequestUrl(destination,
+        getRenameQueryBuilder(destination, continuation).toString());
+
+    // Create the rename operation
+    AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+    try {
+      incrementAbfsRenamePath();
+      op.execute(tracingContext);
+      // AbfsClientResult contains the AbfsOperation, If recovery happened or
+      // not, and the incompleteMetaDataState is true or false.
+      // If we successfully rename a path and isMetadataIncompleteState was
+      // true, then rename was recovered, else it didn't, this is why
+      // isMetadataIncompleteState is used for renameRecovery(as the 2nd 
param).
+      return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+          isMetadataIncompleteState);
+    } catch (AzureBlobFileSystemException e) {
+      // Handle rename exceptions and retry if applicable
+      handleRenameException(source, destination, continuation,
+          tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+      // Attempt recovery using ETag if applicable
+      if (recoveryUsingEtag(source, destination, sourceEtag,
+          op, tracingContext, shouldAttemptRecovery)) {
+        return new AbfsClientRenameResult(
+            getSuccessOp(AbfsRestOperationType.RenamePath,
+                HTTP_METHOD_PUT, url, requestHeaders),
+            true, isMetadataIncompleteState);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Creates a list of HTTP headers required for a rename operation, including 
the encoded source path
+   * and SAS token if applicable.
+   *
+   * @param source the source path for the rename operation
+   * @return a list of {@link AbfsHttpHeader} containing the headers for the 
rename request
+   * @throws IOException if an error occurs while creating the headers or 
encoding the source path
+   */
+  private List<AbfsHttpHeader> getHeadersForRename(final String source)
+      throws IOException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    String encodedRenameSource = urlEncode(
+        FORWARD_SLASH + this.getFileSystem() + source);
+
+    if (getAuthType() == AuthType.SAS) {
+      final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
+      appendSASTokenToQuery(source,
+          SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder);
+      encodedRenameSource += srcQueryBuilder.toString();
+    }
+
+    LOG.trace("Rename source queryparam added {}", encodedRenameSource);
+    requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, 
encodedRenameSource));
+    requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+    return requestHeaders;
+  }
+
+  /**
+   * Builds a query builder for the rename operation URL, including the 
continuation token and SAS token
+   * for the destination path.
+   *
+   * @param destination the destination path for the rename operation
+   * @param continuation the continuation token for the operation
+   * @return an {@link AbfsUriQueryBuilder} containing the query parameters 
for the rename operation
+   * @throws AzureBlobFileSystemException if an error occurs while appending 
the SAS token
+   */
+  private AbfsUriQueryBuilder getRenameQueryBuilder(final String destination,
+      final String continuation) throws AzureBlobFileSystemException {
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+    appendSASTokenToQuery(destination,
+        SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
+    return abfsUriQueryBuilder;
+  }
+
+  /**
+   * Attempts to recover a rename operation using the client transaction ID 
(CTId).
+   * It checks if the provided CTId matches the one in the response header for 
the destination path.
+   *
+   * @param destination the destination path for the rename operation
+   * @param tracingContext the context for tracing the operation
+   * @param clientTransactionId the client transaction ID to be used for 
recovery
+   * @return true if the client transaction ID matches, indicating recovery 
can proceed; false otherwise
+   * @throws AzureBlobFileSystemException if an error occurs while retrieving 
the path status
+   */
+  private boolean recoveryUsingCTId(String destination,
+      TracingContext tracingContext, String clientTransactionId)
+      throws AzureBlobFileSystemException {
+    try {
+      final AbfsHttpOperation abfsHttpOperation =
+          getPathStatus(destination, false,
+              tracingContext, null).getResult();
+      return clientTransactionId.equals(
+          abfsHttpOperation.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID));
+    } catch (AzureBlobFileSystemException exception) {
+      throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception);
+    }
+  }
+
+  /**
+   * Attempts recovery using an ETag for the given source and destination.
+   * If recovery is enabled and rename resilience is supported, performs an 
idempotency check
+   * for the rename operation.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param op the AbfsRestOperation object for the rename operation
+   * @param tracingContext the context for tracing the operation
+   * @param shouldAttemptRecovery flag indicating whether recovery should be 
attempted
+   * @return true if the recovery attempt was successful, false otherwise
+   */
+  private boolean recoveryUsingEtag(String source, String destination,
+      String sourceEtag, AbfsRestOperation op, TracingContext tracingContext,
+      boolean shouldAttemptRecovery) {
+    if (shouldAttemptRecovery && isRenameResilience()) {
+      return renameIdempotencyCheckOp(source, sourceEtag,
+          op, destination, tracingContext);
+    }
+    return false;
+  }
+
+  /**
+   * Checks for rename operation exceptions and handles them accordingly.
+   * Throws an exception or retries the operation if certain error conditions 
are met,
+   * such as unauthorized overwrite or missing destination parent path.
+   *
+   * @param source The source path for the rename operation.
+   * @param destination The destination path for the rename operation.
+   * @param continuation Continuation token for the operation, if applicable.
+   * @param tracingContext The tracing context for tracking the operation.
+   * @param sourceEtag The ETag of the source path for metadata validation.
+   * @param op The ABFS operation result for the rename attempt.
+   * @param isMetadataIncompleteState Flag indicating if metadata is 
incomplete.
+   * @throws IOException If an I/O error occurs during the rename operation.
+   * @throws FileAlreadyExistsException If the destination file already exists 
and overwrite is unauthorized.

Review Comment:
   Removed `and overwrite is unauthorized` from the message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to