[ https://issues.apache.org/jira/browse/HADOOP-19522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17945086#comment-17945086 ]
ASF GitHub Bot commented on HADOOP-19522: ----------------------------------------- bhattmanish98 commented on code in PR #7559: URL: https://github.com/apache/hadoop/pull/7559#discussion_r2047072088 ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java: ########## @@ -0,0 +1,860 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + private static final int TOTAL_THREADS_IN_POOL = 5; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger copyCall) + throws AzureBlobFileSystemException { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( + path.toUri().getPath(), tracingContext, + null, true) + .getResult().getStatusCode(); + Assertions.assertThat(actualStatus) + .describedAs("%s should exists", description) + .isEqualTo(HTTP_OK); + } else { + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("%s should not exists", description) + .isEqualTo(BLOB_PATH_NOT_FOUND); + } + } + + /** + * Helper method to create files in the given directory. + * + * @param fs The AzureBlobFileSystem instance to use for file creation. + * @param src The source path (directory). + * @param numFiles The number of files to create. + * @throws ExecutionException, InterruptedException If an error occurs during file creation. + */ + public static void createFiles(AzureBlobFileSystem fs, Path src, int numFiles) Review Comment: Taken ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java: ########## @@ -0,0 +1,860 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + private static final int TOTAL_THREADS_IN_POOL = 5; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger copyCall) + throws AzureBlobFileSystemException { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( + path.toUri().getPath(), tracingContext, + null, true) + .getResult().getStatusCode(); + Assertions.assertThat(actualStatus) + .describedAs("%s should exists", description) + .isEqualTo(HTTP_OK); + } else { + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("%s should not exists", description) + .isEqualTo(BLOB_PATH_NOT_FOUND); + } + } + + /** + * Helper method to create files in the given directory. + * + * @param fs The AzureBlobFileSystem instance to use for file creation. + * @param src The source path (directory). + * @param numFiles The number of files to create. + * @throws ExecutionException, InterruptedException If an error occurs during file creation. + */ + public static void createFiles(AzureBlobFileSystem fs, Path src, int numFiles) + throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL); + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final int iter = i; + Future future = executorService.submit(() -> + fs.create(new Path(src, "file" + iter + ".txt"))); + futures.add(future); + } + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + } + + /** + * Helper method to create a json file. + * @param path parent path + * @param renameJson rename json path + * @return file system + * @throws IOException in case of failure + */ + private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) + throws IOException { + final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); + assumeBlobServiceType(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + fs.setWorkingDirectory(new Path(ROOT_PATH)); + fs.create(new Path(path, "file.txt")); + + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); + + new RenameAtomicity(path, new Path("/hbase/test4"), + renameJson, getTestTracingContext(fs, true), + fileStatus.getEtag(), client) + .preRename(); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should exist.") + .isTrue(); + + return fs; + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameCopyFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + copyCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the delete operation. + * Simulates an error on the 6th delete operation and verifies the behavior. + */ + @Test + public void testRenameDeleteFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of delete operations + AtomicInteger deleteCall = new AtomicInteger(0); + Mockito.doAnswer(deleteRequest -> { + if (deleteCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_PATH_NOT_FOUND.getStatusCode(), + BLOB_PATH_NOT_FOUND.getErrorCode(), + BLOB_PATH_NOT_FOUND.getErrorMessage(), + new Exception()); + } + deleteCall.incrementAndGet(); + return deleteRequest.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), + Mockito.anyString(), Mockito.any(TracingContext.class)); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate delete operation count + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + deleteCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + // Validate that delete redo operation was triggered + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWhenDestAlreadyExist() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + // Create the destination directory + fs.mkdirs(dst); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + copyCall.set(0); + // List Status on src, this will internally do rename recovery + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); Review Comment: Before rename starts, we have source dir but both json file and dest don't exist. So, it will be better keep both json and src status seperate. ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java: ########## @@ -0,0 +1,860 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + private static final int TOTAL_THREADS_IN_POOL = 5; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger copyCall) + throws AzureBlobFileSystemException { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( + path.toUri().getPath(), tracingContext, + null, true) + .getResult().getStatusCode(); + Assertions.assertThat(actualStatus) + .describedAs("%s should exists", description) + .isEqualTo(HTTP_OK); + } else { + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("%s should not exists", description) + .isEqualTo(BLOB_PATH_NOT_FOUND); + } + } + + /** + * Helper method to create files in the given directory. + * + * @param fs The AzureBlobFileSystem instance to use for file creation. + * @param src The source path (directory). + * @param numFiles The number of files to create. + * @throws ExecutionException, InterruptedException If an error occurs during file creation. + */ + public static void createFiles(AzureBlobFileSystem fs, Path src, int numFiles) + throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL); + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final int iter = i; + Future future = executorService.submit(() -> + fs.create(new Path(src, "file" + iter + ".txt"))); + futures.add(future); + } + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + } + + /** + * Helper method to create a json file. + * @param path parent path + * @param renameJson rename json path + * @return file system + * @throws IOException in case of failure + */ + private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) + throws IOException { + final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); + assumeBlobServiceType(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + fs.setWorkingDirectory(new Path(ROOT_PATH)); + fs.create(new Path(path, "file.txt")); + + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); + + new RenameAtomicity(path, new Path("/hbase/test4"), + renameJson, getTestTracingContext(fs, true), + fileStatus.getEtag(), client) + .preRename(); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should exist.") + .isTrue(); + + return fs; + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameCopyFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + copyCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the delete operation. + * Simulates an error on the 6th delete operation and verifies the behavior. + */ + @Test + public void testRenameDeleteFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of delete operations + AtomicInteger deleteCall = new AtomicInteger(0); + Mockito.doAnswer(deleteRequest -> { + if (deleteCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_PATH_NOT_FOUND.getStatusCode(), + BLOB_PATH_NOT_FOUND.getErrorCode(), + BLOB_PATH_NOT_FOUND.getErrorMessage(), + new Exception()); + } + deleteCall.incrementAndGet(); + return deleteRequest.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), + Mockito.anyString(), Mockito.any(TracingContext.class)); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate delete operation count + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + deleteCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + // Validate that delete redo operation was triggered + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWhenDestAlreadyExist() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + // Create the destination directory + fs.mkdirs(dst); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + copyCall.set(0); + // List Status on src, this will internally do rename recovery + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWithMarkerPresentInDest() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // This will create marker file in the destination + fs.exists(dst); + + copyCall.set(0); + // List Status on src, this will internally do rename recovery + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Test to check behaviour when rename is called on a atomic rename directory + * for which rename pending json file is already present. + * @throws Exception in case of failure + */ + @Test + public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Validate final state of destination and source + validateRename(fs, src, dst, true, true, true); + + copyCall.set(0); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should succeed.") + .isTrue(); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Test case to verify crash recovery with a single child folder. + * + * This test simulates a scenario where a pending rename JSON file exists for a single child folder + * under the parent directory. It ensures that when listing the files in the parent directory, + * only the child folder (with the pending rename JSON file) is returned, and no additional files are listed. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithSingleChildFolder() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + + Assertions.assertThat(fileStatuses.length) Review Comment: Taken ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java: ########## @@ -0,0 +1,860 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + private static final int TOTAL_THREADS_IN_POOL = 5; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger copyCall) + throws AzureBlobFileSystemException { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( + path.toUri().getPath(), tracingContext, + null, true) + .getResult().getStatusCode(); + Assertions.assertThat(actualStatus) + .describedAs("%s should exists", description) + .isEqualTo(HTTP_OK); + } else { + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("%s should not exists", description) + .isEqualTo(BLOB_PATH_NOT_FOUND); + } + } + + /** + * Helper method to create files in the given directory. + * + * @param fs The AzureBlobFileSystem instance to use for file creation. + * @param src The source path (directory). + * @param numFiles The number of files to create. + * @throws ExecutionException, InterruptedException If an error occurs during file creation. + */ + public static void createFiles(AzureBlobFileSystem fs, Path src, int numFiles) + throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL); + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final int iter = i; + Future future = executorService.submit(() -> + fs.create(new Path(src, "file" + iter + ".txt"))); + futures.add(future); + } + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + } + + /** + * Helper method to create a json file. + * @param path parent path + * @param renameJson rename json path + * @return file system + * @throws IOException in case of failure + */ + private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) + throws IOException { + final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); + assumeBlobServiceType(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + fs.setWorkingDirectory(new Path(ROOT_PATH)); + fs.create(new Path(path, "file.txt")); + + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); + + new RenameAtomicity(path, new Path("/hbase/test4"), + renameJson, getTestTracingContext(fs, true), + fileStatus.getEtag(), client) + .preRename(); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should exist.") + .isTrue(); + + return fs; + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameCopyFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + copyCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the delete operation. + * Simulates an error on the 6th delete operation and verifies the behavior. + */ + @Test + public void testRenameDeleteFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of delete operations + AtomicInteger deleteCall = new AtomicInteger(0); + Mockito.doAnswer(deleteRequest -> { + if (deleteCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_PATH_NOT_FOUND.getStatusCode(), + BLOB_PATH_NOT_FOUND.getErrorCode(), + BLOB_PATH_NOT_FOUND.getErrorMessage(), + new Exception()); + } + deleteCall.incrementAndGet(); + return deleteRequest.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), + Mockito.anyString(), Mockito.any(TracingContext.class)); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate delete operation count + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + deleteCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + // Validate that delete redo operation was triggered + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWhenDestAlreadyExist() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + // Create the destination directory + fs.mkdirs(dst); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + copyCall.set(0); + // List Status on src, this will internally do rename recovery + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWithMarkerPresentInDest() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // This will create marker file in the destination + fs.exists(dst); + + copyCall.set(0); + // List Status on src, this will internally do rename recovery + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); Review Comment: Updated this. ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java: ########## @@ -0,0 +1,860 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + private static final int TOTAL_THREADS_IN_POOL = 5; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger copyCall) + throws AzureBlobFileSystemException { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( + path.toUri().getPath(), tracingContext, + null, true) + .getResult().getStatusCode(); + Assertions.assertThat(actualStatus) + .describedAs("%s should exists", description) + .isEqualTo(HTTP_OK); + } else { + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("%s should not exists", description) + .isEqualTo(BLOB_PATH_NOT_FOUND); + } + } + + /** + * Helper method to create files in the given directory. + * + * @param fs The AzureBlobFileSystem instance to use for file creation. + * @param src The source path (directory). + * @param numFiles The number of files to create. + * @throws ExecutionException, InterruptedException If an error occurs during file creation. + */ + public static void createFiles(AzureBlobFileSystem fs, Path src, int numFiles) + throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL); + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final int iter = i; + Future future = executorService.submit(() -> + fs.create(new Path(src, "file" + iter + ".txt"))); + futures.add(future); + } + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + } + + /** + * Helper method to create a json file. + * @param path parent path + * @param renameJson rename json path + * @return file system + * @throws IOException in case of failure + */ + private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) + throws IOException { + final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); + assumeBlobServiceType(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + fs.setWorkingDirectory(new Path(ROOT_PATH)); + fs.create(new Path(path, "file.txt")); + + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); + + new RenameAtomicity(path, new Path("/hbase/test4"), + renameJson, getTestTracingContext(fs, true), + fileStatus.getEtag(), client) + .preRename(); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should exist.") + .isTrue(); + + return fs; + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameCopyFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + copyCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the delete operation. + * Simulates an error on the 6th delete operation and verifies the behavior. + */ + @Test + public void testRenameDeleteFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of delete operations + AtomicInteger deleteCall = new AtomicInteger(0); + Mockito.doAnswer(deleteRequest -> { + if (deleteCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_PATH_NOT_FOUND.getStatusCode(), + BLOB_PATH_NOT_FOUND.getErrorCode(), + BLOB_PATH_NOT_FOUND.getErrorMessage(), + new Exception()); + } + deleteCall.incrementAndGet(); + return deleteRequest.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), + Mockito.anyString(), Mockito.any(TracingContext.class)); + + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate delete operation count + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + deleteCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + // Validate that delete redo operation was triggered + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWhenDestAlreadyExist() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + // Create the destination directory + fs.mkdirs(dst); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + copyCall.set(0); + // List Status on src, this will internally do rename recovery + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWithMarkerPresentInDest() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // This will create marker file in the destination + fs.exists(dst); + + copyCall.set(0); + // List Status on src, this will internally do rename recovery + triggerRenameRecovery(fs, src); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Test to check behaviour when rename is called on a atomic rename directory + * for which rename pending json file is already present. + * @throws Exception in case of failure + */ + @Test + public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(client, copyCall); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Validate final state of destination and source + validateRename(fs, src, dst, true, true, true); + + copyCall.set(0); + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should succeed.") + .isTrue(); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Test case to verify crash recovery with a single child folder. + * + * This test simulates a scenario where a pending rename JSON file exists for a single child folder + * under the parent directory. It ensures that when listing the files in the parent directory, + * only the child folder (with the pending rename JSON file) is returned, and no additional files are listed. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithSingleChildFolder() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 1 file") + .isEqualTo(1); + } finally { + if (fs != null) { + fs.close(); Review Comment: It is not required to close the mocked object. I have removed it. > ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists > with Destination Directory > ----------------------------------------------------------------------------------------------------- > > Key: HADOOP-19522 > URL: https://issues.apache.org/jira/browse/HADOOP-19522 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.5.0, 3.4.1 > Reporter: Manish Bhatt > Assignee: Manish Bhatt > Priority: Blocker > Labels: pull-request-available > > On the blob endpoint, since rename is not a direct operation but a > combination of two operations—copy and delete—in the case of directory > rename, we first rename all the blobs that have the source prefix and, at the > end, rename the source to the destination. > In the normal rename flow, renaming is not allowed if the destination already > exists. However, in the case of recovery, there is a possibility that some > files have already been renamed from the source to the destination. With the > recent change ([HADOOP-19474] ABFS: [FnsOverBlob] Listing Optimizations to > avoid multiple iteration over list response. - ASF JIRA), where we create a > marker if the path is implicit, rename recovery will fail at the end when it > tries to rename the source to the destination after renaming all the files. > To fix this, while renaming the source to the destination, if we encounter an > error indicating that the path already exists, we will suppress the error and > mark the rename recovery as successful. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org