[
https://issues.apache.org/jira/browse/HADOOP-19522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17944940#comment-17944940
]
ASF GitHub Bot commented on HADOOP-19522:
-----------------------------------------
manika137 commented on code in PR #7559:
URL: https://github.com/apache/hadoop/pull/7559#discussion_r2046130007
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java:
##########
@@ -0,0 +1,860 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test rename recovery operation.
+ */
+public class ITestAzureBlobFileSystemRenameRecovery extends
+ AbstractAbfsIntegrationTest {
+
+ private static final int FAILED_CALL = 15;
+
+ private static final int TOTAL_FILES = 25;
+
+ private static final int TOTAL_THREADS_IN_POOL = 5;
+
+ public ITestAzureBlobFileSystemRenameRecovery() throws Exception {
+ super();
+ }
+
+ /**
+ * Triggers rename recovery by calling getPathStatus on the source path.
+ * This simulates a scenario where the rename operation was interrupted,
+ * and the system needs to recover the state of the source path.
+ *
+ * @param fs The AzureBlobFileSystem instance.
+ * @param src The source path to trigger recovery on.
+ * @throws Exception If an error occurs during the recovery process.
+ */
+ private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws
Exception {
+ // Trigger rename recovery
+ TracingContext tracingContext = new TracingContext(
+ getConfiguration().getClientCorrelationId(), fs.getFileSystemId(),
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
null);
+ AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+ AbfsRestOperationException.class, () -> {
+ fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(),
true,
+ tracingContext, null);
+ }).getErrorCode();
+ Assertions.assertThat(errorCode)
+ .describedAs("Path had to be recovered from atomic rename operation.")
+ .isEqualTo(PATH_NOT_FOUND);
+ }
+
+ /**
+ * Simulates a failure during the rename operation by throwing an exception
+ * when the copyBlob method is called. This is used to test the behavior of
+ * the rename recovery operation when a blob already exists at the
destination.
+ *
+ * @param client The AbfsBlobClient instance.
+ * @param copyCall The AtomicInteger to track the number of copy calls.
+ * @throws AzureBlobFileSystemException If an error occurs during the
operation.
+ */
+ private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger
copyCall)
+ throws AzureBlobFileSystemException {
+ Mockito.doAnswer(copyRequest -> {
+ if (copyCall.get() == FAILED_CALL) {
+ throw new AbfsRestOperationException(
+ BLOB_ALREADY_EXISTS.getStatusCode(),
+ BLOB_ALREADY_EXISTS.getErrorCode(),
+ BLOB_ALREADY_EXISTS.getErrorMessage(),
+ new Exception());
+ }
+ copyCall.incrementAndGet();
+ return copyRequest.callRealMethod();
+ }).when(client).copyBlob(Mockito.any(Path.class),
+ Mockito.any(Path.class), Mockito.nullable(String.class),
+ Mockito.any(TracingContext.class));
+ }
+
+ /**
+ * Helper method to create the configuration for the AzureBlobFileSystem.
+ *
+ * @return The configuration object.
+ */
+ private Configuration getConfig() {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5");
+ config.set(FS_AZURE_CONSUMER_MAX_LAG, "3");
+ config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2");
+ return config;
+ }
+
+ /**
+ * Spies on the AzureBlobFileSystem's store and client to enable mocking and
verification
+ * of client interactions in tests. It replaces the actual store and client
with mocked versions.
+ *
+ * @param fs the AzureBlobFileSystem instance
+ * @return the spied AbfsClient for interaction verification
+ */
+ private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(store).when(fs).getAbfsStore();
+ AbfsClient client = Mockito.spy(store.getClient());
+ Mockito.doReturn(client).when(store).getClient();
+ return client;
+ }
+
+ /**
+ * Helper method to validate that the rename was successful and that the
destination exists.
+ *
+ * @param fs The AzureBlobFileSystem instance to check the existence on.
+ * @param dst The destination path.
+ * @param src The source path.
+ * @throws IOException If an I/O error occurs during the validation.
+ */
+ private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+ boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws
Exception {
+ // Validate pending JSON file status
+ assertPathStatus(fs,
+ new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist,
+ "Pending JSON file");
+
+ // Validate source directory status
+ assertPathStatus(fs, src, isSrcExist, "Source directory");
+
+ // Validate destination directory status
+ assertPathStatus(fs, dst, isDstExist, "Destination directory");
+ }
+
+ /**
+ * Helper method to assert the status of a path in the AzureBlobFileSystem.
+ *
+ * @param fs The AzureBlobFileSystem instance to check the existence on.
+ * @param path The path to check.
+ * @param shouldExist Whether the path should exist or not.
+ * @param description A description for the assertion.
+ * @throws Exception If an error occurs during the assertion.
+ */
+ private void assertPathStatus(AzureBlobFileSystem fs, Path path,
+ boolean shouldExist, String description) throws Exception{
+ TracingContext tracingContext = getTestTracingContext(fs, true);
+ AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient());
+ if (shouldExist) {
+ int actualStatus = client.getPathStatus(
Review Comment:
Is there any reason for using getpathstatus instead of direct
fs.exists("path") here?
We could shorten it directly to
Assertions.assertThat(fs.exists(path)).isTrue(); else
> ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists
> with Destination Directory
> -----------------------------------------------------------------------------------------------------
>
> Key: HADOOP-19522
> URL: https://issues.apache.org/jira/browse/HADOOP-19522
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Manish Bhatt
> Assignee: Manish Bhatt
> Priority: Blocker
> Labels: pull-request-available
>
> On the blob endpoint, since rename is not a direct operation but a
> combination of two operations—copy and delete—in the case of directory
> rename, we first rename all the blobs that have the source prefix and, at the
> end, rename the source to the destination.
> In the normal rename flow, renaming is not allowed if the destination already
> exists. However, in the case of recovery, there is a possibility that some
> files have already been renamed from the source to the destination. With the
> recent change ([HADOOP-19474] ABFS: [FnsOverBlob] Listing Optimizations to
> avoid multiple iteration over list response. - ASF JIRA), where we create a
> marker if the path is implicit, rename recovery will fail at the end when it
> tries to rename the source to the destination after renaming all the files.
> To fix this, while renaming the source to the destination, if we encounter an
> error indicating that the path already exists, we will suppress the error and
> mark the rename recovery as successful.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org