[ 
https://issues.apache.org/jira/browse/HADOOP-19522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17945090#comment-17945090
 ] 

ASF GitHub Bot commented on HADOOP-19522:
-----------------------------------------

anujmodi2021 commented on code in PR #7559:
URL: https://github.com/apache/hadoop/pull/7559#discussion_r2047110190


##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java:
##########
@@ -0,0 +1,860 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test rename recovery operation.
+ */
+public class ITestAzureBlobFileSystemRenameRecovery extends
+    AbstractAbfsIntegrationTest {
+
+  private static final int FAILED_CALL = 15;
+
+  private static final int TOTAL_FILES = 25;
+
+  private static final int TOTAL_THREADS_IN_POOL = 5;
+
+  public ITestAzureBlobFileSystemRenameRecovery() throws Exception {
+    super();
+  }
+
+  /**
+   * Triggers rename recovery by calling getPathStatus on the source path.
+   * This simulates a scenario where the rename operation was interrupted,
+   * and the system needs to recover the state of the source path.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param src The source path to trigger recovery on.
+   * @throws Exception If an error occurs during the recovery process.
+   */
+  private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws 
Exception {
+    // Trigger rename recovery
+    TracingContext tracingContext = new TracingContext(
+        getConfiguration().getClientCorrelationId(), fs.getFileSystemId(),
+        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, 
null);
+    AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+        AbfsRestOperationException.class, () -> {
+          fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), 
true,
+              tracingContext, null);
+        }).getErrorCode();
+    Assertions.assertThat(errorCode)
+        .describedAs("Path had to be recovered from atomic rename operation.")
+        .isEqualTo(PATH_NOT_FOUND);
+  }
+
+  /**
+   * Simulates a failure during the rename operation by throwing an exception
+   * when the copyBlob method is called. This is used to test the behavior of
+   * the rename recovery operation when a blob already exists at the 
destination.
+   *
+   * @param client The AbfsBlobClient instance.
+   * @param copyCall The AtomicInteger to track the number of copy calls.
+   * @throws AzureBlobFileSystemException If an error occurs during the 
operation.
+   */
+  private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger 
copyCall)
+      throws AzureBlobFileSystemException {
+    Mockito.doAnswer(copyRequest -> {
+      if (copyCall.get() == FAILED_CALL) {
+        throw new AbfsRestOperationException(
+            BLOB_ALREADY_EXISTS.getStatusCode(),
+            BLOB_ALREADY_EXISTS.getErrorCode(),
+            BLOB_ALREADY_EXISTS.getErrorMessage(),
+            new Exception());
+      }
+      copyCall.incrementAndGet();
+      return copyRequest.callRealMethod();
+    }).when(client).copyBlob(Mockito.any(Path.class),
+        Mockito.any(Path.class), Mockito.nullable(String.class),
+        Mockito.any(TracingContext.class));
+  }
+
+  /**
+   * Helper method to create the configuration for the AzureBlobFileSystem.
+   *
+   * @return The configuration object.
+   */
+  private Configuration getConfig() {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5");
+    config.set(FS_AZURE_CONSUMER_MAX_LAG, "3");
+    config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2");
+    return config;
+  }
+
+  /**
+   * Spies on the AzureBlobFileSystem's store and client to enable mocking and 
verification
+   * of client interactions in tests. It replaces the actual store and client 
with mocked versions.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @return the spied AbfsClient for interaction verification
+   */
+  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+    return client;
+  }
+
+  /**
+   * Helper method to validate that the rename was successful and that the 
destination exists.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param dst The destination path.
+   * @param src The source path.
+   * @throws IOException If an I/O error occurs during the validation.
+   */
+  private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+      boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws 
Exception {
+    // Validate pending JSON file status
+    assertPathStatus(fs,
+        new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist,
+        "Pending JSON file");
+
+    // Validate source directory status
+    assertPathStatus(fs, src, isSrcExist, "Source directory");
+
+    // Validate destination directory status
+    assertPathStatus(fs, dst, isDstExist, "Destination directory");
+  }
+
+  /**
+   * Helper method to assert the status of a path in the AzureBlobFileSystem.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param path The path to check.
+   * @param shouldExist Whether the path should exist or not.
+   * @param description A description for the assertion.
+   * @throws Exception If an error occurs during the assertion.
+   */
+  private void assertPathStatus(AzureBlobFileSystem fs, Path path,
+      boolean shouldExist, String description) throws Exception{
+    TracingContext tracingContext = getTestTracingContext(fs, true);
+    AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient());
+    if (shouldExist) {
+      int actualStatus = client.getPathStatus(
+              path.toUri().getPath(), tracingContext,
+              null, true)
+          .getResult().getStatusCode();
+      Assertions.assertThat(actualStatus)
+          .describedAs("%s should exists", description)
+          .isEqualTo(HTTP_OK);
+    } else {
+      AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+          AbfsRestOperationException.class, () -> {
+            client.getPathStatus(path.toUri().getPath(), true,
+                tracingContext, null);
+          }).getErrorCode();
+      Assertions.assertThat(errorCode)
+          .describedAs("%s should not exists", description)
+          .isEqualTo(BLOB_PATH_NOT_FOUND);
+    }
+  }
+
+  /**
+   * Helper method to create files in the given directory.
+   *
+   * @param fs The AzureBlobFileSystem instance to use for file creation.
+   * @param src The source path (directory).
+   * @param numFiles The number of files to create.
+   * @throws ExecutionException, InterruptedException If an error occurs 
during file creation.
+   */
+  public static void createFiles(AzureBlobFileSystem fs, Path src, int 
numFiles)
+      throws ExecutionException, InterruptedException {
+    ExecutorService executorService = 
Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      final int iter = i;
+      Future future = executorService.submit(() ->
+          fs.create(new Path(src, "file" + iter + ".txt")));
+      futures.add(future);
+    }
+    for (Future future : futures) {
+      future.get();
+    }
+    executorService.shutdown();
+  }
+
+  /**
+   * Helper method to create a json file.
+   * @param path parent path
+   * @param renameJson rename json path
+   * @return file system
+   * @throws IOException in case of failure
+   */
+  private AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
+      throws IOException {
+    final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
+    assumeBlobServiceType();
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+
+    fs.setWorkingDirectory(new Path(ROOT_PATH));
+    fs.create(new Path(path, "file.txt"));
+
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path);
+
+    new RenameAtomicity(path, new Path("/hbase/test4"),
+        renameJson, getTestTracingContext(fs, true),
+        fileStatus.getEtag(), client)
+        .preRename();
+
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should exist.")
+        .isTrue();
+
+    return fs;
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameCopyFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(client, copyCall);
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate copy operation count
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // Assertions to validate renamed destination and source
+      validateRename(fs, src, dst, true, true, true);
+
+      // Validate that rename redo operation was triggered
+      copyCall.set(0);
+      triggerRenameRecovery(fs, src);
+
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      validateRename(fs, src, dst, false, true, false);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the delete operation.
+   * Simulates an error on the 6th delete operation and verifies the behavior.
+   */
+  @Test
+  public void testRenameDeleteFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of delete operations
+      AtomicInteger deleteCall = new AtomicInteger(0);
+      Mockito.doAnswer(deleteRequest -> {
+        if (deleteCall.get() == FAILED_CALL) {
+          throw new AbfsRestOperationException(
+              BLOB_PATH_NOT_FOUND.getStatusCode(),
+              BLOB_PATH_NOT_FOUND.getErrorCode(),
+              BLOB_PATH_NOT_FOUND.getErrorMessage(),
+              new Exception());
+        }
+        deleteCall.incrementAndGet();
+        return deleteRequest.callRealMethod();
+      }).when(client).deleteBlobPath(Mockito.any(Path.class),
+          Mockito.anyString(), Mockito.any(TracingContext.class));
+
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate delete operation count
+      Assertions.assertThat(deleteCall.get())
+          .describedAs("Delete operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // Assertions to validate renamed destination and source
+      validateRename(fs, src, dst, true, true, true);
+
+      // Validate that rename redo operation was triggered
+      deleteCall.set(0);
+      triggerRenameRecovery(fs, src);
+
+      Assertions.assertThat(deleteCall.get())
+          .describedAs("Delete operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      // Validate that delete redo operation was triggered
+      validateRename(fs, src, dst, false, true, false);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Since, destination path already exists, there will be adjustment in the
+   * destination path. After crash recovery, recovery should succeed even in 
the
+   * case when destination path already exists.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameRecoveryWhenDestAlreadyExist() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+      // Create the destination directory
+      fs.mkdirs(dst);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(client, copyCall);
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate copy operation count
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // Assertions to validate renamed destination and source
+      validateRename(fs, src, dst, true, true, true);
+
+      copyCall.set(0);
+      // List Status on src, this will internally do rename recovery
+      triggerRenameRecovery(fs, src);
+
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      validateRename(fs, src, dst, false, true, false);

Review Comment:
   Makes sense. Are we validating that for any tests?





> ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists 
> with Destination Directory
> -----------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-19522
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19522
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Manish Bhatt
>            Assignee: Manish Bhatt
>            Priority: Blocker
>              Labels: pull-request-available
>
> On the blob endpoint, since rename is not a direct operation but a 
> combination of two operations—copy and delete—in the case of directory 
> rename, we first rename all the blobs that have the source prefix and, at the 
> end, rename the source to the destination.
> In the normal rename flow, renaming is not allowed if the destination already 
> exists. However, in the case of recovery, there is a possibility that some 
> files have already been renamed from the source to the destination. With the 
> recent change ([HADOOP-19474] ABFS: [FnsOverBlob] Listing Optimizations to 
> avoid multiple iteration over list response. - ASF JIRA), where we create a 
> marker if the path is implicit, rename recovery will fail at the end when it 
> tries to rename the source to the destination after renaming all the files.
> To fix this, while renaming the source to the destination, if we encounter an 
> error indicating that the path already exists, we will suppress the error and 
> mark the rename recovery as successful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to