[ 
https://issues.apache.org/jira/browse/HADOOP-19450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929578#comment-17929578
 ] 

ASF GitHub Bot commented on HADOOP-19450:
-----------------------------------------

anujmodi2021 commented on code in PR #7364:
URL: https://github.com/apache/hadoop/pull/7364#discussion_r1967062469


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -216,6 +216,11 @@ private AbfsClient(final URL baseUrl,
       encryptionType = EncryptionType.GLOBAL_KEY;
     }
 
+    // Version update needed to support x-ms-client-transaction-id header

Review Comment:
   I think we might need to be careful here.
   1. Do we want to upgrade version for all APIs here or just the Rename/create?
   2. What happens when driver upgrades the version overall? It might lead to 
downgrade. May be we should check if version is less than required then only 
update else use the default version.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java:
##########
@@ -733,4 +737,53 @@ protected void checkFuturesForExceptions(List<Future<?>> 
futures, int exceptionV
     }
     assertEquals(exceptionCaught, exceptionVal);
   }
+
+  /**
+   * Assumes that recovery through client transaction ID is enabled.
+   * Namespace is enabled for the given AzureBlobFileSystem.
+   * Service type is DFS.
+   * Assumes that the client transaction ID is enabled in the configuration.
+   *
+   * @param fs the AzureBlobFileSystem instance to check
+   * @throws AzureBlobFileSystemException in case of an error
+   */
+  protected void assumeRecoveryThroughClientTransactionID(
+      AzureBlobFileSystem fs, boolean isCreate)
+      throws AzureBlobFileSystemException {
+    // Assumes that recovery through client transaction ID is enabled.
+    Assume.assumeTrue(getConfiguration().getIsClientTransactionIdEnabled());
+    // Assumes that service type is DFS.
+    assumeDfsServiceType();
+    // Assumes that namespace is enabled for the given AzureBlobFileSystem.
+    Assume.assumeTrue(
+        fs.getIsNamespaceEnabled(getTestTracingContext(fs, true)));
+    if (isCreate) {
+      // Assume that create client is DFS client.
+      Assume.assumeTrue(
+          AbfsServiceType.DFS.equals(
+              
fs.getAbfsStore().getAbfsConfiguration().getIngressServiceType()));
+      // Assume that append blob is not enabled in DFS client.
+      Assume.assumeFalse(isAppendBlobEnabled());
+    }
+  }
+
+  /**
+   * Mocks the behavior of adding a client transaction ID to the request 
headers
+   * for the given AzureBlobFileSystem. This method generates a random 
transaction ID
+   * and adds it to the headers of the {@link AbfsDfsClient}.
+   *
+   * @param abfsDfsClient The {@link AbfsDfsClient} mocked AbfsDfsClient.
+   * @param clientTransactionId An array to hold the generated transaction ID.
+   */
+  protected void mockAddClientTransactionIdToHeader(AbfsDfsClient 
abfsDfsClient,

Review Comment:
   Can this be moved to `AbfsClientTestUtil` class or similar utility class. 
Doesn't seem like a base class thing.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java:
##########
@@ -167,4 +187,163 @@ public void testMkdirWithExistingFilename() throws 
Exception {
     intercept(FileAlreadyExistsException.class, () -> fs.mkdirs(new 
Path("/testFilePath")));
     intercept(FileAlreadyExistsException.class, () -> fs.mkdirs(new 
Path("/testFilePath/newDir")));
   }
+
+  /**
+   * Tests the idempotency of creating a path with retries by simulating
+   * a conflict response (HTTP 409) from the Azure Blob File System client.
+   * The method ensures that the path creation operation retries correctly
+   * with the proper transaction ID headers, verifying idempotency during
+   * failure recovery.
+   *
+   * @throws Exception if any error occurs during the operation.
+   */
+  @Test
+  public void createPathRetryIdempotency() throws Exception {
+    Configuration configuration = new Configuration(getRawConfiguration());
+    configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true");

Review Comment:
   Instead of enabling it just for this test, we should enabel it for all the 
create/mkdir/rename tests IMO.
   This is a service side change and normal tests should also be working with 
this.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/MockIntercept.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.mockito.invocation.InvocationOnMock;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+
+/**
+ * Interface used to intercept and customize the behavior of mocked
+ * `AbfsRestOperation` objects. The implementing class should define
+ * how to handle the mock operation when it is invoked.
+ *
+ * @param <T> the type of the mocked object, typically an `AbfsRestOperation`
+ */
+public interface MockIntercept<T> {

Review Comment:
   Why is this needed?



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java:
##########
@@ -733,4 +737,53 @@ protected void checkFuturesForExceptions(List<Future<?>> 
futures, int exceptionV
     }
     assertEquals(exceptionCaught, exceptionVal);
   }
+
+  /**
+   * Assumes that recovery through client transaction ID is enabled.
+   * Namespace is enabled for the given AzureBlobFileSystem.
+   * Service type is DFS.
+   * Assumes that the client transaction ID is enabled in the configuration.
+   *
+   * @param fs the AzureBlobFileSystem instance to check
+   * @throws AzureBlobFileSystemException in case of an error
+   */
+  protected void assumeRecoveryThroughClientTransactionID(
+      AzureBlobFileSystem fs, boolean isCreate)

Review Comment:
   We don't need fs here if we use base class utility method.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java:
##########
@@ -733,4 +737,53 @@ protected void checkFuturesForExceptions(List<Future<?>> 
futures, int exceptionV
     }
     assertEquals(exceptionCaught, exceptionVal);
   }
+
+  /**
+   * Assumes that recovery through client transaction ID is enabled.
+   * Namespace is enabled for the given AzureBlobFileSystem.
+   * Service type is DFS.
+   * Assumes that the client transaction ID is enabled in the configuration.
+   *
+   * @param fs the AzureBlobFileSystem instance to check
+   * @throws AzureBlobFileSystemException in case of an error
+   */
+  protected void assumeRecoveryThroughClientTransactionID(
+      AzureBlobFileSystem fs, boolean isCreate)
+      throws AzureBlobFileSystemException {
+    // Assumes that recovery through client transaction ID is enabled.
+    Assume.assumeTrue(getConfiguration().getIsClientTransactionIdEnabled());
+    // Assumes that service type is DFS.
+    assumeDfsServiceType();
+    // Assumes that namespace is enabled for the given AzureBlobFileSystem.
+    Assume.assumeTrue(

Review Comment:
   We have a method for this as well. `assumeHnsEnabled`



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java:
##########
@@ -733,4 +737,53 @@ protected void checkFuturesForExceptions(List<Future<?>> 
futures, int exceptionV
     }
     assertEquals(exceptionCaught, exceptionVal);
   }
+
+  /**
+   * Assumes that recovery through client transaction ID is enabled.
+   * Namespace is enabled for the given AzureBlobFileSystem.
+   * Service type is DFS.
+   * Assumes that the client transaction ID is enabled in the configuration.
+   *
+   * @param fs the AzureBlobFileSystem instance to check
+   * @throws AzureBlobFileSystemException in case of an error
+   */
+  protected void assumeRecoveryThroughClientTransactionID(
+      AzureBlobFileSystem fs, boolean isCreate)
+      throws AzureBlobFileSystemException {
+    // Assumes that recovery through client transaction ID is enabled.
+    Assume.assumeTrue(getConfiguration().getIsClientTransactionIdEnabled());
+    // Assumes that service type is DFS.
+    assumeDfsServiceType();
+    // Assumes that namespace is enabled for the given AzureBlobFileSystem.
+    Assume.assumeTrue(
+        fs.getIsNamespaceEnabled(getTestTracingContext(fs, true)));
+    if (isCreate) {
+      // Assume that create client is DFS client.
+      Assume.assumeTrue(
+          AbfsServiceType.DFS.equals(
+              
fs.getAbfsStore().getAbfsConfiguration().getIngressServiceType()));
+      // Assume that append blob is not enabled in DFS client.
+      Assume.assumeFalse(isAppendBlobEnabled());
+    }
+  }
+
+  /**
+   * Mocks the behavior of adding a client transaction ID to the request 
headers
+   * for the given AzureBlobFileSystem. This method generates a random 
transaction ID
+   * and adds it to the headers of the {@link AbfsDfsClient}.
+   *
+   * @param abfsDfsClient The {@link AbfsDfsClient} mocked AbfsDfsClient.
+   * @param clientTransactionId An array to hold the generated transaction ID.
+   */
+  protected void mockAddClientTransactionIdToHeader(AbfsDfsClient 
abfsDfsClient,
+      String[] clientTransactionId) {
+    Mockito.doAnswer(addClientTransactionId -> {
+      clientTransactionId[0] = UUID.randomUUID().toString();
+      List<AbfsHttpHeader> headers = addClientTransactionId.getArgument(0);
+      headers.add(
+          new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID,
+              clientTransactionId[0]));
+      return clientTransactionId[0];
+    }).when(abfsDfsClient).addClientTransactionIdToHeader(Mockito.anyList());
+  }
 }

Review Comment:
   Should we set this config as true for all the tests so that any regression 
can be catched when running our CI and PR validations??



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java:
##########
@@ -167,4 +187,163 @@ public void testMkdirWithExistingFilename() throws 
Exception {
     intercept(FileAlreadyExistsException.class, () -> fs.mkdirs(new 
Path("/testFilePath")));
     intercept(FileAlreadyExistsException.class, () -> fs.mkdirs(new 
Path("/testFilePath/newDir")));
   }
+
+  /**
+   * Tests the idempotency of creating a path with retries by simulating
+   * a conflict response (HTTP 409) from the Azure Blob File System client.
+   * The method ensures that the path creation operation retries correctly
+   * with the proper transaction ID headers, verifying idempotency during
+   * failure recovery.
+   *
+   * @throws Exception if any error occurs during the operation.
+   */
+  @Test
+  public void createPathRetryIdempotency() throws Exception {
+    Configuration configuration = new Configuration(getRawConfiguration());
+    configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true");
+    try (AzureBlobFileSystem fs = getFileSystem(configuration)) {
+      assumeRecoveryThroughClientTransactionID(fs, true);
+      AbfsDfsClient abfsClient = mockIngressClientHandler(fs);
+      final Path nonOverwriteFile = new Path(
+          "/NonOverwriteTest_FileName_" + UUID.randomUUID());
+      final List<AbfsHttpHeader> headers = new ArrayList<>();
+      TestAbfsClient.mockAbfsOperationCreation(abfsClient,
+          new MockIntercept<AbfsRestOperation>() {
+            private int count = 0;
+
+            @Override
+            public void answer(final AbfsRestOperation mockedObj,
+                final InvocationOnMock answer)
+                throws AbfsRestOperationException {
+              if (count == 0) {
+                count = 1;
+                AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
+                Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
+                
Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
+                Mockito.doReturn(true).when(mockedObj).hasResult();
+                Mockito.doReturn(op).when(mockedObj).getResult();
+                Mockito.doReturn(HTTP_CONFLICT).when(op).getStatusCode();
+                headers.addAll(mockedObj.getRequestHeaders());
+                throw new AbfsRestOperationException(HTTP_CONFLICT,
+                    AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), 
EMPTY_STRING,
+                    null, op);
+              }
+            }
+          });
+      AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
+      AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
+      Mockito.doAnswer(answer -> {
+        String requiredHeader = null;
+        for (AbfsHttpHeader httpHeader : headers) {
+          if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase(
+              httpHeader.getName())) {
+            requiredHeader = httpHeader.getValue();
+            break;
+          }
+        }
+        return requiredHeader;
+      }).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID);
+      Mockito.doReturn(true).when(getPathRestOp).hasResult();
+      Mockito.doReturn(op).when(getPathRestOp).getResult();
+      Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus(
+          Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
+          Mockito.nullable(TracingContext.class),
+          Mockito.nullable(ContextEncryptionAdapter.class));
+      fs.create(nonOverwriteFile, false);
+    }
+  }
+
+  /**
+   * Test to verify that the client transaction ID is included in the response 
header
+   * during the creation of a new file in Azure Blob Storage.
+   * <p>
+   * This test ensures that when a new file is created, the Azure Blob 
FileSystem client
+   * correctly includes the client transaction ID in the response header for 
the created file.
+   * The test uses a configuration where client transaction ID is enabled and 
verifies
+   * its presence after the file creation operation.
+   * </p>
+   *
+   * @throws Exception if any error occurs during test execution
+   */
+  @Test
+  public void getClientTransactionIdAfterCreate() throws Exception {

Review Comment:
   Nit: SHould we move all the create test cases to 
`ITestAzureBlobFileSystemCreate` class?





> [ABFS] Rename/Create path idempotency client-level resolution
> -------------------------------------------------------------
>
>                 Key: HADOOP-19450
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19450
>             Project: Hadoop Common
>          Issue Type: Task
>          Components: fs/azure
>    Affects Versions: 3.5.0
>            Reporter: Manish Bhatt
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>
> CreatePath and RenamePath APIs are idempotent as subsequent retries on same 
> resource don’t change the server state. However, when client experiences 
> connection break on the CreatePath and the RenamePath APIs, client cannot 
> make sense if the request is accepted by the server or not. 
> On connection failure, the client retries the request. The server might 
> return 404 (sourceNotFound) in case of RenamePath API and 409 
> (pathAlreadyExists) in case of CreatePath (overwrite=false) API. Now the 
> client doesn’t have a path forward. Reason being, in case of CreatePath, 
> client doesn’t know if the path was created on the original request or the 
> path was already there for some other request, in case of RenamePath, client 
> doesn’t know if the source was removed because of the original-try or it was 
> not there on the first place. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to