bhattmanish98 commented on code in PR #7559: URL: https://github.com/apache/hadoop/pull/7559#discussion_r2046357162
########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java: ########## @@ -0,0 +1,860 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test rename recovery operation. + */ +public class ITestAzureBlobFileSystemRenameRecovery extends + AbstractAbfsIntegrationTest { + + private static final int FAILED_CALL = 15; + + private static final int TOTAL_FILES = 25; + + private static final int TOTAL_THREADS_IN_POOL = 5; + + public ITestAzureBlobFileSystemRenameRecovery() throws Exception { + super(); + } + + /** + * Triggers rename recovery by calling getPathStatus on the source path. + * This simulates a scenario where the rename operation was interrupted, + * and the system needs to recover the state of the source path. + * + * @param fs The AzureBlobFileSystem instance. + * @param src The source path to trigger recovery on. + * @throws Exception If an error occurs during the recovery process. + */ + private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception { + // Trigger rename recovery + TracingContext tracingContext = new TracingContext( + getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + AzureServiceErrorCode errorCode = LambdaTestUtils.intercept( + AbfsRestOperationException.class, () -> { + fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true, + tracingContext, null); + }).getErrorCode(); + Assertions.assertThat(errorCode) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND); + } + + /** + * Simulates a failure during the rename operation by throwing an exception + * when the copyBlob method is called. This is used to test the behavior of + * the rename recovery operation when a blob already exists at the destination. + * + * @param client The AbfsBlobClient instance. + * @param copyCall The AtomicInteger to track the number of copy calls. + * @throws AzureBlobFileSystemException If an error occurs during the operation. + */ + private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger copyCall) + throws AzureBlobFileSystemException { + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @return The configuration object. + */ + private Configuration getConfig() { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5"); + config.set(FS_AZURE_CONSUMER_MAX_LAG, "3"); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2"); + return config; + } + + /** + * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification + * of client interactions in tests. It replaces the actual store and client with mocked versions. + * + * @param fs the AzureBlobFileSystem instance + * @return the spied AbfsClient for interaction verification + */ + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + return client; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception { + // Validate pending JSON file status + assertPathStatus(fs, + new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist, + "Pending JSON file"); + + // Validate source directory status + assertPathStatus(fs, src, isSrcExist, "Source directory"); + + // Validate destination directory status + assertPathStatus(fs, dst, isDstExist, "Destination directory"); + } + + /** + * Helper method to assert the status of a path in the AzureBlobFileSystem. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param path The path to check. + * @param shouldExist Whether the path should exist or not. + * @param description A description for the assertion. + * @throws Exception If an error occurs during the assertion. + */ + private void assertPathStatus(AzureBlobFileSystem fs, Path path, + boolean shouldExist, String description) throws Exception{ + TracingContext tracingContext = getTestTracingContext(fs, true); + AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient()); + if (shouldExist) { + int actualStatus = client.getPathStatus( Review Comment: fs.exists internally triggers rename recovery in case path is of source directory. We have separated the check for the existence of the path and then trigger rename recovery separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org