anujmodi2021 commented on code in PR #7559:
URL: https://github.com/apache/hadoop/pull/7559#discussion_r2047123608


##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java:
##########
@@ -0,0 +1,860 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test rename recovery operation.
+ */
+public class ITestAzureBlobFileSystemRenameRecovery extends
+    AbstractAbfsIntegrationTest {
+
+  private static final int FAILED_CALL = 15;
+
+  private static final int TOTAL_FILES = 25;
+
+  private static final int TOTAL_THREADS_IN_POOL = 5;
+
+  public ITestAzureBlobFileSystemRenameRecovery() throws Exception {
+    super();
+  }
+
+  /**
+   * Triggers rename recovery by calling getPathStatus on the source path.
+   * This simulates a scenario where the rename operation was interrupted,
+   * and the system needs to recover the state of the source path.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param src The source path to trigger recovery on.
+   * @throws Exception If an error occurs during the recovery process.
+   */
+  private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws 
Exception {
+    // Trigger rename recovery
+    TracingContext tracingContext = new TracingContext(
+        getConfiguration().getClientCorrelationId(), fs.getFileSystemId(),
+        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, 
null);
+    AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+        AbfsRestOperationException.class, () -> {
+          fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), 
true,
+              tracingContext, null);
+        }).getErrorCode();
+    Assertions.assertThat(errorCode)
+        .describedAs("Path had to be recovered from atomic rename operation.")
+        .isEqualTo(PATH_NOT_FOUND);
+  }
+
+  /**
+   * Simulates a failure during the rename operation by throwing an exception
+   * when the copyBlob method is called. This is used to test the behavior of
+   * the rename recovery operation when a blob already exists at the 
destination.
+   *
+   * @param client The AbfsBlobClient instance.
+   * @param copyCall The AtomicInteger to track the number of copy calls.
+   * @throws AzureBlobFileSystemException If an error occurs during the 
operation.
+   */
+  private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger 
copyCall)
+      throws AzureBlobFileSystemException {
+    Mockito.doAnswer(copyRequest -> {
+      if (copyCall.get() == FAILED_CALL) {
+        throw new AbfsRestOperationException(
+            BLOB_ALREADY_EXISTS.getStatusCode(),
+            BLOB_ALREADY_EXISTS.getErrorCode(),
+            BLOB_ALREADY_EXISTS.getErrorMessage(),
+            new Exception());
+      }
+      copyCall.incrementAndGet();
+      return copyRequest.callRealMethod();
+    }).when(client).copyBlob(Mockito.any(Path.class),
+        Mockito.any(Path.class), Mockito.nullable(String.class),
+        Mockito.any(TracingContext.class));
+  }
+
+  /**
+   * Helper method to create the configuration for the AzureBlobFileSystem.
+   *
+   * @return The configuration object.
+   */
+  private Configuration getConfig() {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5");
+    config.set(FS_AZURE_CONSUMER_MAX_LAG, "3");
+    config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2");
+    return config;
+  }
+
+  /**
+   * Spies on the AzureBlobFileSystem's store and client to enable mocking and 
verification
+   * of client interactions in tests. It replaces the actual store and client 
with mocked versions.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @return the spied AbfsClient for interaction verification
+   */
+  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+    return client;
+  }
+
+  /**
+   * Helper method to validate that the rename was successful and that the 
destination exists.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param dst The destination path.
+   * @param src The source path.
+   * @throws IOException If an I/O error occurs during the validation.
+   */
+  private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+      boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws 
Exception {
+    // Validate pending JSON file status
+    assertPathStatus(fs,
+        new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist,
+        "Pending JSON file");
+
+    // Validate source directory status
+    assertPathStatus(fs, src, isSrcExist, "Source directory");
+
+    // Validate destination directory status
+    assertPathStatus(fs, dst, isDstExist, "Destination directory");
+  }
+
+  /**
+   * Helper method to assert the status of a path in the AzureBlobFileSystem.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param path The path to check.
+   * @param shouldExist Whether the path should exist or not.
+   * @param description A description for the assertion.
+   * @throws Exception If an error occurs during the assertion.
+   */
+  private void assertPathStatus(AzureBlobFileSystem fs, Path path,
+      boolean shouldExist, String description) throws Exception{
+    TracingContext tracingContext = getTestTracingContext(fs, true);
+    AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient());
+    if (shouldExist) {
+      int actualStatus = client.getPathStatus(
+              path.toUri().getPath(), tracingContext,
+              null, true)
+          .getResult().getStatusCode();
+      Assertions.assertThat(actualStatus)
+          .describedAs("%s should exists", description)
+          .isEqualTo(HTTP_OK);
+    } else {
+      AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+          AbfsRestOperationException.class, () -> {
+            client.getPathStatus(path.toUri().getPath(), true,
+                tracingContext, null);
+          }).getErrorCode();
+      Assertions.assertThat(errorCode)
+          .describedAs("%s should not exists", description)
+          .isEqualTo(BLOB_PATH_NOT_FOUND);
+    }
+  }
+
+  /**
+   * Helper method to create files in the given directory.
+   *
+   * @param fs The AzureBlobFileSystem instance to use for file creation.
+   * @param src The source path (directory).
+   * @param numFiles The number of files to create.
+   * @throws ExecutionException, InterruptedException If an error occurs 
during file creation.
+   */
+  public static void createFiles(AzureBlobFileSystem fs, Path src, int 
numFiles)
+      throws ExecutionException, InterruptedException {
+    ExecutorService executorService = 
Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      final int iter = i;
+      Future future = executorService.submit(() ->
+          fs.create(new Path(src, "file" + iter + ".txt")));
+      futures.add(future);
+    }
+    for (Future future : futures) {
+      future.get();
+    }
+    executorService.shutdown();
+  }
+
+  /**
+   * Helper method to create a json file.
+   * @param path parent path
+   * @param renameJson rename json path
+   * @return file system
+   * @throws IOException in case of failure
+   */
+  private AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
+      throws IOException {
+    final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
+    assumeBlobServiceType();
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+
+    fs.setWorkingDirectory(new Path(ROOT_PATH));
+    fs.create(new Path(path, "file.txt"));
+
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path);
+
+    new RenameAtomicity(path, new Path("/hbase/test4"),
+        renameJson, getTestTracingContext(fs, true),
+        fileStatus.getEtag(), client)
+        .preRename();
+
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should exist.")
+        .isTrue();
+
+    return fs;
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameCopyFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(client, copyCall);
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate copy operation count
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // Assertions to validate renamed destination and source
+      validateRename(fs, src, dst, true, true, true);
+
+      // Validate that rename redo operation was triggered
+      copyCall.set(0);
+      triggerRenameRecovery(fs, src);
+
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      validateRename(fs, src, dst, false, true, false);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the delete operation.
+   * Simulates an error on the 6th delete operation and verifies the behavior.
+   */
+  @Test
+  public void testRenameDeleteFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of delete operations
+      AtomicInteger deleteCall = new AtomicInteger(0);
+      Mockito.doAnswer(deleteRequest -> {
+        if (deleteCall.get() == FAILED_CALL) {
+          throw new AbfsRestOperationException(
+              BLOB_PATH_NOT_FOUND.getStatusCode(),
+              BLOB_PATH_NOT_FOUND.getErrorCode(),
+              BLOB_PATH_NOT_FOUND.getErrorMessage(),
+              new Exception());
+        }
+        deleteCall.incrementAndGet();
+        return deleteRequest.callRealMethod();
+      }).when(client).deleteBlobPath(Mockito.any(Path.class),
+          Mockito.anyString(), Mockito.any(TracingContext.class));
+
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate delete operation count
+      Assertions.assertThat(deleteCall.get())
+          .describedAs("Delete operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // Assertions to validate renamed destination and source
+      validateRename(fs, src, dst, true, true, true);
+
+      // Validate that rename redo operation was triggered
+      deleteCall.set(0);
+      triggerRenameRecovery(fs, src);
+
+      Assertions.assertThat(deleteCall.get())
+          .describedAs("Delete operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      // Validate that delete redo operation was triggered
+      validateRename(fs, src, dst, false, true, false);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Since, destination path already exists, there will be adjustment in the
+   * destination path. After crash recovery, recovery should succeed even in 
the
+   * case when destination path already exists.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameRecoveryWhenDestAlreadyExist() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+      // Create the destination directory
+      fs.mkdirs(dst);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(client, copyCall);
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate copy operation count
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // Assertions to validate renamed destination and source
+      validateRename(fs, src, dst, true, true, true);
+
+      copyCall.set(0);
+      // List Status on src, this will internally do rename recovery
+      triggerRenameRecovery(fs, src);
+
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      validateRename(fs, src, dst, false, true, false);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Since, destination path already exists, there will be adjustment in the
+   * destination path. After crash recovery, recovery should succeed even in 
the
+   * case when destination path already exists.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameRecoveryWithMarkerPresentInDest() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(client, copyCall);
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate copy operation count
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // This will create marker file in the destination
+      fs.exists(dst);
+
+      copyCall.set(0);
+      // List Status on src, this will internally do rename recovery
+      triggerRenameRecovery(fs, src);
+
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      validateRename(fs, src, dst, false, true, false);
+    }
+  }
+
+  /**
+   * Test to check behaviour when rename is called on a atomic rename directory
+   * for which rename pending json file is already present.
+   * @throws Exception in case of failure
+   */
+  @Test
+  public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws 
Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(client, copyCall);
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should crash in between.")
+          .isFalse();
+
+      // Validate copy operation count
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be less than 10.")
+          .isLessThan(TOTAL_FILES);
+
+      // Validate final state of destination and source
+      validateRename(fs, src, dst, true, true, true);
+
+      copyCall.set(0);
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+
+      Assertions.assertThat(copyCall.get())
+          .describedAs("Copy operation count should be greater than 0.")
+          .isGreaterThan(0);
+
+      // Validate final state of destination and source
+      validateRename(fs, src, dst, false, true, false);
+    }
+  }
+
+  /**
+   * Test case to verify crash recovery with a single child folder.
+   *
+   * This test simulates a scenario where a pending rename JSON file exists 
for a single child folder
+   * under the parent directory. It ensures that when listing the files in the 
parent directory,
+   * only the child folder (with the pending rename JSON file) is returned, 
and no additional files are listed.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithSingleChildFolder() throws Exception {
+    AzureBlobFileSystem fs = null;
+    try {
+      Path path = new Path("/hbase/A1/A2");
+      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+      fs = createJsonFile(path, renameJson);
+
+      FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+      Assertions.assertThat(fileStatuses.length)

Review Comment:
   As per the discussion, rename pending json path should not be there in list 
output. Assert on its absense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to