bhattmanish98 commented on code in PR #7559: URL: https://github.com/apache/hadoop/pull/7559#discussion_r2048908242
########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java: ########## @@ -0,0 +1,763 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to rename. + * @param dst The destination path for the rename operation. + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AzureBlobFileSystem fs, Path src, Path dst, + AbfsBlobClient client, AtomicInteger copyCall) + throws Exception { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + renameOperationWithRecovery(fs, src, dst, copyCall); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( + path.toUri().getPath(), tracingContext, + null, true) + .getResult().getStatusCode(); + Assertions.assertThat(actualStatus) + .describedAs("%s should exists", description) + .isEqualTo(HTTP_OK); + } else { + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("%s should not exists", description) + .isEqualTo(BLOB_PATH_NOT_FOUND); + } + } + + /** + * Helper method to create a json file. + * @param path parent path + * @param renameJson rename json path + * @return file system + * @throws IOException in case of failure + */ + private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) + throws IOException { + final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); + assumeBlobServiceType(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + fs.setWorkingDirectory(new Path(ROOT_PATH)); + fs.create(new Path(path, "file.txt")); + + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); + + new RenameAtomicity(path, new Path("/hbase/test4"), + renameJson, getTestTracingContext(fs, true), + fileStatus.getEtag(), client) + .preRename(); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should exist.") + .isTrue(); + + return fs; + } + + /** + * Helper method to perform the rename operation and validate the results. + * + * @param fs The AzureBlobFileSystem instance to use for the rename operation. + * @param src The source path (directory). + * @param dst The destination path (directory). + * @param countCall The AtomicInteger to track the number of operations. + * @throws Exception If an error occurs during the rename operation. + */ + private void renameOperationWithRecovery(AzureBlobFileSystem fs, Path src, + Path dst, AtomicInteger countCall) throws Exception { + Assertions.assertThat(fs.rename(src, dst)) + .describedAs("Rename should crash in between.") + .isFalse(); + + // Validate copy operation count + Assertions.assertThat(countCall.get()) + .describedAs("Operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, true, true, true); + + // Validate that rename redo operation was triggered + countCall.set(0); + triggerRenameRecovery(fs, src); + + Assertions.assertThat(countCall.get()) + .describedAs("Operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameCopyFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Tests renaming a directory with a failure during the delete operation. + * Simulates an error on the 6th delete operation and verifies the behavior. + */ + @Test + public void testRenameDeleteFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of delete operations + AtomicInteger deleteCall = new AtomicInteger(0); + Mockito.doAnswer(deleteRequest -> { + if (deleteCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_PATH_NOT_FOUND.getStatusCode(), + BLOB_PATH_NOT_FOUND.getErrorCode(), + BLOB_PATH_NOT_FOUND.getErrorMessage(), + new Exception()); + } + deleteCall.incrementAndGet(); + return deleteRequest.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), + Mockito.anyString(), Mockito.any(TracingContext.class)); + + renameOperationWithRecovery(fs, src, dst, deleteCall); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWhenDestAlreadyExist() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + // Create the destination directory + fs.mkdirs(dst); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + // Assertions to validate source and dest status before rename + validateRename(fs, src, dst, true, true, false); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Since, destination path already exists, there will be adjustment in the + * destination path. After crash recovery, recovery should succeed even in the + * case when destination path already exists. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameRecoveryWithMarkerPresentInDest() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Test to check behaviour when rename is called on a atomic rename directory + * for which rename pending json file is already present. + * @throws Exception in case of failure + */ + @Test + public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + renameCrashInBetween(fs, src, dst, client, copyCall); + } + } + + /** + * Helper method to assert that the pending JSON file does not exist + * and that the list of file statuses does not contain the rename pending JSON file. + * + * @param fs The AzureBlobFileSystem instance. + * @param renameJson The path of the rename pending JSON file. + * @param fileStatuses The array of FileStatus objects to check. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPendingJsonFile(AzureBlobFileSystem fs, + Path renameJson, FileStatus[] fileStatuses) throws Exception { + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should not exist.") + .isFalse(); + Assertions.assertThat( + Arrays.stream(fileStatuses) + .anyMatch(status -> + renameJson.toUri().getPath() + .equals(status.getPath().toUri().getPath()))) + .describedAs("Directory with suffix -RenamePending.json should exist.") Review Comment: Added source path check as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org