This is an automated email from the ASF dual-hosted git repository. anujmodi pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 2814d917085 HADOOP-19450: [ABFS] Rename/Create path idempotency client-level resolution (#7364) (#7436) 2814d917085 is described below commit 2814d917085bbd67009f02a899bcf3fcea8da39d Author: Manish Bhatt <52626736+bhattmanis...@users.noreply.github.com> AuthorDate: Thu Feb 27 03:34:02 2025 -0800 HADOOP-19450: [ABFS] Rename/Create path idempotency client-level resolution (#7364) (#7436) Contributed by Manish Bhatt --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 8 + .../fs/azurebfs/AzureBlobFileSystemStore.java | 9 +- .../fs/azurebfs/constants/AbfsHttpConstants.java | 3 +- .../fs/azurebfs/constants/ConfigurationKeys.java | 2 + .../constants/FileSystemConfigurations.java | 2 + .../constants/HttpHeaderConfigurations.java | 6 + .../contracts/exceptions/AbfsDriverException.java | 8 + .../fs/azurebfs/services/AbfsBlobClient.java | 16 +- .../hadoop/fs/azurebfs/services/AbfsClient.java | 28 ++- .../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 87 +++++++- .../hadoop/fs/azurebfs/services/AbfsErrors.java | 4 + .../fs/azurebfs/AbstractAbfsIntegrationTest.java | 27 +++ .../azurebfs/ITestAzureBlobFileSystemCreate.java | 228 +++++++++++++++++++++ .../azurebfs/ITestAzureBlobFileSystemRename.java | 189 +++++++++++++++++ .../apache/hadoop/fs/azurebfs/MockIntercept.java | 43 ++++ .../fs/azurebfs/services/AbfsClientTestUtil.java | 22 ++ .../fs/azurebfs/services/TestAbfsClient.java | 39 ++++ .../services/TestAbfsRenameRetryRecovery.java | 43 +++- 18 files changed, 731 insertions(+), 33 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 11d57bf9756..b9c951a787d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -440,6 +440,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME) private long maxApacheHttpClientConnectionIdleTime; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, + DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID) + private boolean enableClientTransactionId; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1070,6 +1074,10 @@ public long getMaxApacheHttpClientConnectionIdleTime() { return maxApacheHttpClientConnectionIdleTime; } + public boolean getIsClientTransactionIdEnabled() { + return enableClientTransactionId; + } + /** * Enum config to allow user to pick format of x-ms-client-request-id header * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index e42343a6e83..867edfd1438 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -54,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; @@ -147,7 +148,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; @@ -1876,7 +1876,7 @@ private long extractContentLength(AbfsHttpOperation op) { long contentLength; String contentLengthHeader = op.getResponseHeader( HttpHeaderConfigurations.CONTENT_LENGTH); - if (!contentLengthHeader.equals(EMPTY_STRING)) { + if (!StringUtils.isEmpty(contentLengthHeader)) { contentLength = Long.parseLong(contentLengthHeader); } else { contentLength = 0; @@ -2161,6 +2161,11 @@ void setClient(AbfsClient client) { this.client = client; } + @VisibleForTesting + void setClientHandler(AbfsClientHandler clientHandler) { + this.clientHandler = clientHandler; + } + @VisibleForTesting DataBlocks.BlockFactory getBlockFactory() { return blockFactory; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 20ccd9fd871..b2eafd63cb1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -184,7 +184,8 @@ public enum ApiVersion { DEC_12_2019("2019-12-12"), APR_10_2021("2021-04-10"), - AUG_03_2023("2023-08-03"); + AUG_03_2023("2023-08-03"), + NOV_04_2024("2024-11-04"); private final String xMsApiVersion; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 9c7ab42f370..3742361b484 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -383,6 +383,8 @@ public static String accountProperty(String property, String account) { public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread"; /**Maximum number of thread per blob-delete orchestration: {@value}*/ public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread"; + /**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/ + public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 370787a69cc..6072fcbb6fa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -198,5 +198,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS; + public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = false; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index e5909d22f48..b442b1f8533 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -132,5 +132,11 @@ public final class HttpHeaderConfigurations { */ public static final String X_MS_COPY_STATUS = "x-ms-copy-status"; + /** + * Http Request Header for create rename idempotence. + * {@value} + */ + public static final String X_MS_CLIENT_TRANSACTION_ID = "x-ms-client-transaction-id"; + private HttpHeaderConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java index a7635e893c5..7b2d03d6923 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java @@ -51,4 +51,12 @@ public AbfsDriverException(final Exception innerException, final String activity : ERROR_MESSAGE + ", rId: " + activityId, null); } + + public AbfsDriverException(final String errorMessage, final Exception innerException) { + super( + AzureServiceErrorCode.UNKNOWN.getStatusCode(), + AzureServiceErrorCode.UNKNOWN.getErrorCode(), + errorMessage, + innerException); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index b54ce1a4dac..0065cee7dcb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -814,10 +814,9 @@ destination, sourceEtag, isAtomicRenameKey(source), tracingContext final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - final AbfsRestOperation successOp = getAbfsRestOperation( + final AbfsRestOperation successOp = getSuccessOp( AbfsRestOperationType.RenamePath, HTTP_METHOD_PUT, url, requestHeaders); - successOp.hardSetResult(HTTP_OK); return new AbfsClientRenameResult(successOp, true, false); } else { throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR, @@ -1208,9 +1207,9 @@ public AbfsRestOperation getPathStatus(final String path, if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isImplicitCheckRequired && isNonEmptyDirectory(path, tracingContext)) { // Implicit path found. - AbfsRestOperation successOp = getAbfsRestOperation( - AbfsRestOperationType.GetPathStatus, - HTTP_METHOD_HEAD, url, requestHeaders); + AbfsRestOperation successOp = getSuccessOp( + AbfsRestOperationType.GetPathStatus, HTTP_METHOD_HEAD, + url, requestHeaders); successOp.hardSetGetFileStatusResult(HTTP_OK); return successOp; } @@ -1308,11 +1307,8 @@ public AbfsRestOperation deletePath(final String path, = createDefaultUriQueryBuilder(); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - final AbfsRestOperation successOp = getAbfsRestOperation( - AbfsRestOperationType.DeletePath, HTTP_METHOD_DELETE, - url, requestHeaders); - successOp.hardSetResult(HTTP_OK); - return successOp; + return getSuccessOp(AbfsRestOperationType.DeletePath, + HTTP_METHOD_DELETE, url, requestHeaders); } else { throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR, AzureServiceErrorCode.UNKNOWN.getErrorCode(), diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index bbbea75a185..5143a4a1954 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -216,6 +216,11 @@ private AbfsClient(final URL baseUrl, encryptionType = EncryptionType.GLOBAL_KEY; } + // Version update needed to support x-ms-client-transaction-id header + if (abfsConfiguration.getIsClientTransactionIdEnabled()) { + xMsVersion = ApiVersion.NOV_04_2024; + } + String sslProviderName = null; if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) { @@ -942,14 +947,12 @@ public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) { && DEFAULT_DELETE_CONSIDERED_IDEMPOTENT) { // Server has returned HTTP 404, which means path no longer // exists. Assuming delete result to be idempotent, return success. - final AbfsRestOperation successOp = getAbfsRestOperation( + LOG.debug("Returning success response from delete idempotency logic"); + return getSuccessOp( AbfsRestOperationType.DeletePath, HTTP_METHOD_DELETE, op.getUrl(), op.getRequestHeaders()); - successOp.hardSetResult(HttpURLConnection.HTTP_OK); - LOG.debug("Returning success response from delete idempotency logic"); - return successOp; } return op; @@ -1737,4 +1740,21 @@ public abstract Hashtable<String, String> getXMSProperties(AbfsHttpOperation res * @throws UnsupportedEncodingException if decoding fails */ public abstract String decodeAttribute(byte[] value) throws UnsupportedEncodingException; + + /** + * Get the dummy success operation. + * @param operationType type of the operation + * @param httpMethod http method + * @param url url to be used + * @param requestHeaders list of headers to be sent with the request + * @return success operation + */ + protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationType, + final String httpMethod, final URL url, + final List<AbfsHttpHeader> requestHeaders) { + final AbfsRestOperation successOp = getAbfsRestOperation( + operationType, httpMethod, url, requestHeaders); + successOp.hardSetResult(HttpURLConnection.HTTP_OK); + return successOp; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index f571cb86cca..05acaa78f48 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -40,6 +40,7 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -109,6 +111,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_EXISTING_RESOURCE_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD; @@ -134,7 +137,9 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.UNAUTHORIZED_BLOB_OVERWRITE; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_RECOVERY; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_FILE_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY; /** * AbfsClient interacting with the DFS Endpoint. @@ -383,6 +388,9 @@ public AbfsRestOperation createPath(final String path, requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); } + // Add the client transaction ID to the request headers. + String clientTransactionId = addClientTransactionIdToHeader(requestHeaders); + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); if (isAppendBlob) { @@ -405,11 +413,34 @@ public AbfsRestOperation createPath(final String path, if (!op.hasResult()) { throw ex; } - if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { - String existingResource = - op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE); - if (existingResource != null && existingResource.equals(DIRECTORY)) { - return op; //don't throw ex on mkdirs for existing directory + if (!isFile) { + if (op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + String existingResource = + op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE); + if (existingResource != null && existingResource.equals(DIRECTORY)) { + //don't throw ex on mkdirs for existing directory + return getSuccessOp(AbfsRestOperationType.CreatePath, + HTTP_METHOD_PUT, url, requestHeaders); + } + } + } else { + // recovery using client transaction id only if it is a retried request. + if (op.isARetriedRequest() && clientTransactionId != null + && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT + || op.getResult().getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED)) { + try { + final AbfsHttpOperation getPathStatusOp = + getPathStatus(path, false, + tracingContext, null).getResult(); + if (clientTransactionId.equals( + getPathStatusOp.getResponseHeader( + X_MS_CLIENT_TRANSACTION_ID))) { + return getSuccessOp(AbfsRestOperationType.CreatePath, + HTTP_METHOD_PUT, url, requestHeaders); + } + } catch (AzureBlobFileSystemException exception) { + throw new AbfsDriverException(ERR_CREATE_RECOVERY, exception); + } } } throw ex; @@ -681,6 +712,9 @@ public AbfsClientRenameResult renamePath( requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + // Add the client transaction ID to the request headers. + String clientTransactionId = addClientTransactionIdToHeader(requestHeaders); + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); appendSASTokenToQuery(destination, @@ -705,11 +739,33 @@ public AbfsClientRenameResult renamePath( throw e; } + // recovery using client transaction id only if it is a retried request. + if (op.isARetriedRequest() && clientTransactionId != null + && SOURCE_PATH_NOT_FOUND.getErrorCode().equalsIgnoreCase( + op.getResult().getStorageErrorCode())) { + try { + final AbfsHttpOperation abfsHttpOperation = + getPathStatus(destination, false, + tracingContext, null).getResult(); + if (clientTransactionId.equals( + abfsHttpOperation.getResponseHeader( + X_MS_CLIENT_TRANSACTION_ID))) { + return new AbfsClientRenameResult( + getSuccessOp(AbfsRestOperationType.RenamePath, + HTTP_METHOD_PUT, url, requestHeaders), true, + isMetadataIncompleteState); + } + } catch (AzureBlobFileSystemException exception) { + throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception); + } + throw e; + } + // ref: HADOOP-19393. Write permission checks can occur before validating // rename operation's validity. If there is an existing destination path, it may be rejected // with an authorization error. Catching and throwing FileAlreadyExistsException instead. if (op.getResult().getStorageErrorCode() - .equals(UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode())){ + .equals(UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode())) { throw new FileAlreadyExistsException(ERR_FILE_ALREADY_EXISTS); } @@ -1589,4 +1645,23 @@ private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsPro return properties; } + + /** + * Add the client transaction id to the request header + * if {@link AbfsConfiguration#getIsClientTransactionIdEnabled()} is enabled. + * @param requestHeaders list of headers to be sent with the request + * + * @return client transaction id + */ + @VisibleForTesting + public String addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders) { + String clientTransactionId = null; + // Set client transaction ID if the namespace and client transaction ID config are enabled. + if (getIsNamespaceEnabled() && getAbfsConfiguration().getIsClientTransactionIdEnabled()) { + clientTransactionId = UUID.randomUUID().toString(); + requestHeaders.add( + new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID, clientTransactionId)); + } + return clientTransactionId; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java index 87c8869d6c6..e75df046d8d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -69,5 +69,9 @@ public final class AbfsErrors { "FNS-Blob delete was not successful for path: "; public static final String ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION = "Path had to be recovered from atomic rename operation."; + public static final String ERR_CREATE_RECOVERY = + "Error while recovering from create failure."; + public static final String ERR_RENAME_RECOVERY = + "Error while recovering from rename failure."; private AbfsErrors() {} } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 4003da49e51..e31df5eec65 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -733,4 +733,31 @@ protected void checkFuturesForExceptions(List<Future<?>> futures, int exceptionV } assertEquals(exceptionCaught, exceptionVal); } + + /** + * Assumes that recovery through client transaction ID is enabled. + * Namespace is enabled for the given AzureBlobFileSystem. + * Service type is DFS. + * Assumes that the client transaction ID is enabled in the configuration. + * + * @throws IOException in case of an error + */ + protected void assumeRecoveryThroughClientTransactionID(boolean isCreate) + throws IOException { + // Assumes that recovery through client transaction ID is enabled. + Assume.assumeTrue("Recovery through client transaction ID is not enabled", + getConfiguration().getIsClientTransactionIdEnabled()); + // Assumes that service type is DFS. + assumeDfsServiceType(); + // Assumes that namespace is enabled for the given AzureBlobFileSystem. + assumeHnsEnabled(); + if (isCreate) { + // Assume that create client is DFS client. + Assume.assumeTrue("Ingress service type is not DFS", + AbfsServiceType.DFS.equals(getIngressServiceType())); + // Assume that append blob is not enabled in DFS client. + Assume.assumeFalse("Append blob is enabled in DFS client", + isAppendBlobEnabled()); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index b3314894d3f..f59c4a8b9f2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -50,17 +50,22 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler; import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; +import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; +import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; @@ -70,16 +75,23 @@ import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.ReflectionUtils; +import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT; import static java.net.HttpURLConnection.HTTP_CONFLICT; import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; +import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_RECOVERY; import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -2062,4 +2074,220 @@ private String extractFileEtag(String fileName) throws IOException { op = client.getPathStatus(fileName, true, testTracingContext, null); return AzureBlobFileSystemStore.extractEtagHeader(op.getResult()); } + + /** + * Tests the idempotency of creating a path with retries by simulating + * a conflict response (HTTP 409) from the Azure Blob File System client. + * The method ensures that the path creation operation retries correctly + * with the proper transaction ID headers, verifying idempotency during + * failure recovery. + * + * @throws Exception if any error occurs during the operation. + */ + @Test + public void testCreatePathRetryIdempotency() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true"); + try (AzureBlobFileSystem fs = getFileSystem(configuration)) { + assumeRecoveryThroughClientTransactionID(true); + AbfsDfsClient abfsClient = mockIngressClientHandler(fs); + final Path nonOverwriteFile = new Path( + "/NonOverwriteTest_FileName_" + UUID.randomUUID()); + final List<AbfsHttpHeader> headers = new ArrayList<>(); + mockRetriedRequest(abfsClient, headers); + AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class); + AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class); + Mockito.doAnswer(answer -> { + String requiredHeader = null; + for (AbfsHttpHeader httpHeader : headers) { + if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase( + httpHeader.getName())) { + requiredHeader = httpHeader.getValue(); + break; + } + } + return requiredHeader; + }).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID); + Mockito.doReturn(true).when(getPathRestOp).hasResult(); + Mockito.doReturn(op).when(getPathRestOp).getResult(); + Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus( + Mockito.nullable(String.class), Mockito.nullable(Boolean.class), + Mockito.nullable(TracingContext.class), + Mockito.nullable(ContextEncryptionAdapter.class)); + fs.create(nonOverwriteFile, false); + } + } + + /** + * Test to verify that the client transaction ID is included in the response header + * during the creation of a new file in Azure Blob Storage. + * + * This test ensures that when a new file is created, the Azure Blob FileSystem client + * correctly includes the client transaction ID in the response header for the created file. + * The test uses a configuration where client transaction ID is enabled and verifies + * its presence after the file creation operation. + * + * @throws Exception if any error occurs during test execution + */ + @Test + public void testGetClientTransactionIdAfterCreate() throws Exception { + try (AzureBlobFileSystem fs = getFileSystem()) { + assumeRecoveryThroughClientTransactionID(true); + final String[] clientTransactionId = new String[1]; + AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs); + mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId); + final Path nonOverwriteFile = new Path( + "/NonOverwriteTest_FileName_" + UUID.randomUUID()); + fs.create(nonOverwriteFile, false); + + final AbfsHttpOperation getPathStatusOp = + abfsDfsClient.getPathStatus(nonOverwriteFile.toUri().getPath(), false, + getTestTracingContext(fs, true), null).getResult(); + Assertions.assertThat( + getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)) + .describedAs("Client transaction ID should be set during create") + .isNotNull(); + Assertions.assertThat( + getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)) + .describedAs("Client transaction ID should be equal to the one set in the header") + .isEqualTo(clientTransactionId[0]); + } + } + + /** + * Test to verify that the client transaction ID is included in the response header + * after two consecutive create operations on the same file in Azure Blob Storage. + * + * This test ensures that even after performing two create operations (with overwrite) + * on the same file, the Azure Blob FileSystem client includes the client transaction ID + * in the response header for the created file. The test checks for the presence of + * the client transaction ID in the response after the second create call. + * + * @throws Exception if any error occurs during test execution + */ + @Test + public void testClientTransactionIdAfterTwoCreateCalls() throws Exception { + try (AzureBlobFileSystem fs = getFileSystem()) { + assumeRecoveryThroughClientTransactionID(true); + final String[] clientTransactionId = new String[1]; + AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs); + mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId); + Path testPath = path("testfile"); + AzureBlobFileSystemStore.Permissions permissions + = new AzureBlobFileSystemStore.Permissions(false, + FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + fs.create(testPath, false); + fs.create(testPath, true); + final AbfsHttpOperation getPathStatusOp = + abfsDfsClient.getPathStatus(testPath.toUri().getPath(), false, + getTestTracingContext(fs, true), null).getResult(); + Assertions.assertThat( + getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)) + .describedAs("Client transaction ID should be set during create") + .isNotNull(); + Assertions.assertThat( + getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)) + .describedAs("Client transaction ID should be equal to the one set in the header") + .isEqualTo(clientTransactionId[0]); + } + } + + /** + * Test case to simulate a failure scenario during the recovery process while + * creating a file in Azure Blob File System. This test verifies that when the + * `getPathStatus` method encounters a timeout exception during recovery, it + * triggers an appropriate failure response. + * + * The test mocks the `AbfsDfsClient` to simulate the failure behavior, including + * a retry logic. It also verifies that an exception is correctly thrown and the + * error message contains the expected details for recovery failure. + * + * @throws Exception If an error occurs during the test setup or execution. + */ + @Test + public void testFailureInGetPathStatusDuringCreateRecovery() throws Exception { + try (AzureBlobFileSystem fs = getFileSystem()) { + assumeRecoveryThroughClientTransactionID(true); + final String[] clientTransactionId = new String[1]; + AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs); + mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId); + mockRetriedRequest(abfsDfsClient, new ArrayList<>()); + boolean[] flag = new boolean[1]; + Mockito.doAnswer(getPathStatus -> { + if (!flag[0]) { + flag[0] = true; + throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception()); + } + return getPathStatus.callRealMethod(); + }).when(abfsDfsClient).getPathStatus( + Mockito.nullable(String.class), Mockito.nullable(Boolean.class), + Mockito.nullable(TracingContext.class), + Mockito.nullable(ContextEncryptionAdapter.class)); + + final Path nonOverwriteFile = new Path( + "/NonOverwriteTest_FileName_" + UUID.randomUUID()); + String errorMessage = intercept(AbfsDriverException.class, + () -> fs.create(nonOverwriteFile, false)).getErrorMessage(); + + Assertions.assertThat(errorMessage) + .describedAs("getPathStatus should fail while recovering") + .contains(ERR_CREATE_RECOVERY); + } + } + + /** + * Mocks and returns an instance of {@link AbfsDfsClient} for the given AzureBlobFileSystem. + * This method sets up the necessary mock behavior for the client handler and ingress client. + * + * @param fs The {@link AzureBlobFileSystem} instance for which the client handler will be mocked. + * @return A mocked {@link AbfsDfsClient} instance associated with the provided file system. + */ + private AbfsDfsClient mockIngressClientHandler(AzureBlobFileSystem fs) { + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); + AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy( + clientHandler.getClient()); + fs.getAbfsStore().setClient(abfsDfsClient); + fs.getAbfsStore().setClientHandler(clientHandler); + Mockito.doReturn(abfsDfsClient).when(clientHandler).getIngressClient(); + return abfsDfsClient; + } + + /** + * Mocks the retry behavior for an AbfsDfsClient request. The method intercepts + * the Abfs operation and simulates an HTTP conflict (HTTP 409) error on the + * first invocation. It creates a mock HTTP operation with a PUT method and + * specific status codes and error messages. + * + * @param abfsDfsClient The AbfsDfsClient to mock operations for. + * @param headers The list of HTTP headers to which request headers will be added. + * + * @throws Exception If an error occurs during mock creation or operation execution. + */ + private void mockRetriedRequest(AbfsDfsClient abfsDfsClient, + final List<AbfsHttpHeader> headers) throws Exception { + TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient, + new MockIntercept<AbfsRestOperation>() { + private int count = 0; + + @Override + public void answer(final AbfsRestOperation mockedObj, + final InvocationOnMock answer) + throws AbfsRestOperationException { + if (count == 0) { + count = 1; + AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class); + Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod(); + Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage(); + Mockito.doReturn(true).when(mockedObj).hasResult(); + Mockito.doReturn(op).when(mockedObj).getResult(); + Mockito.doReturn(HTTP_CONFLICT).when(op).getStatusCode(); + headers.addAll(mockedObj.getRequestHeaders()); + throw new AbfsRestOperationException(HTTP_CONFLICT, + AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), EMPTY_STRING, + null, op); + } + } + }); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index abd45eae0e0..afb66c054fc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -33,6 +33,7 @@ import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; import org.mockito.Mockito; import org.mockito.stubbing.Answer; @@ -43,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; @@ -50,12 +52,14 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsLease; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.BlobRenameHandler; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityTestUtils; +import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import org.apache.hadoop.fs.statistics.IOStatisticAssertions; @@ -63,6 +67,7 @@ import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.functional.FunctionRaisingIOE; +import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT; import static java.net.HttpURLConnection.HTTP_CONFLICT; import static java.net.HttpURLConnection.HTTP_FORBIDDEN; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; @@ -70,11 +75,19 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_PENDING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY; import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; @@ -1641,4 +1654,180 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() Mockito.any(TracingContext.class)); fs.rename(new Path(dirPathStr), new Path("/dst/")); } + + /** + * Test to verify the idempotency of the `rename` operation in Azure Blob File System when retrying + * after a failure. The test simulates a "path not found" error (HTTP 404) on the first attempt, + * checks that the operation correctly retries using the appropriate transaction ID, + * and ensures that the source file is renamed to the destination path once successful. + * + * @throws Exception if an error occurs during the file system operations or mocking + */ + @Test + public void testRenamePathRetryIdempotency() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true"); + try (AzureBlobFileSystem fs = getFileSystem()) { + assumeRecoveryThroughClientTransactionID(false); + AbfsDfsClient abfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient()); + fs.getAbfsStore().setClient(abfsClient); + Path sourceDir = path("/testSrc"); + assertMkdirs(fs, sourceDir); + String filename = "file1"; + Path sourceFilePath = new Path(sourceDir, filename); + touch(sourceFilePath); + Path destFilePath = new Path(sourceDir, "file2"); + + final List<AbfsHttpHeader> headers = new ArrayList<>(); + mockRetriedRequest(abfsClient, headers); + + AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class); + AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class); + Mockito.doAnswer(answer -> { + String requiredHeader = null; + for (AbfsHttpHeader httpHeader : headers) { + if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase( + httpHeader.getName())) { + requiredHeader = httpHeader.getValue(); + break; + } + } + return requiredHeader; + }).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID); + Mockito.doReturn(true).when(getPathRestOp).hasResult(); + Mockito.doReturn(op).when(getPathRestOp).getResult(); + Mockito.doReturn(DIRECTORY) + .when(op) + .getResponseHeader(X_MS_RESOURCE_TYPE); + Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus( + Mockito.nullable(String.class), Mockito.nullable(Boolean.class), + Mockito.nullable(TracingContext.class), + Mockito.nullable(ContextEncryptionAdapter.class)); + fs.rename(sourceFilePath, destFilePath); + } + } + + /** + * Test to verify that the client transaction ID is included in the response header + * after renaming a file in Azure Blob Storage. + * + * This test ensures that when a file is renamed, the Azure Blob FileSystem client + * properly includes the client transaction ID in the response header for the renamed file. + * The test uses a configuration where client transaction ID is enabled and verifies + * its presence after performing a rename operation. + * + * @throws Exception if any error occurs during test execution + */ + @Test + public void testGetClientTransactionIdAfterRename() throws Exception { + try (AzureBlobFileSystem fs = getFileSystem()) { + assumeRecoveryThroughClientTransactionID(false); + AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient()); + fs.getAbfsStore().setClient(abfsDfsClient); + final String[] clientTransactionId = new String[1]; + mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId); + Path sourceDir = path("/testSrc"); + assertMkdirs(fs, sourceDir); + String filename = "file1"; + Path sourceFilePath = new Path(sourceDir, filename); + touch(sourceFilePath); + Path destFilePath = new Path(sourceDir, "file2"); + fs.rename(sourceFilePath, destFilePath); + + final AbfsHttpOperation getPathStatusOp = + abfsDfsClient.getPathStatus(destFilePath.toUri().getPath(), false, + getTestTracingContext(fs, true), null).getResult(); + Assertions.assertThat( + getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)) + .describedAs("Client transaction id should be present in dest file") + .isNotNull(); + Assertions.assertThat( + getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID)) + .describedAs("Client transaction ID should be equal to the one set in the header") + .isEqualTo(clientTransactionId[0]); + } + } + + /** + * Tests the recovery process during a file rename operation in Azure Blob File System when + * the `getPathStatus` method encounters a timeout exception. The test ensures that the proper + * error message is returned when the operation fails during recovery. + * + * @throws Exception If an error occurs during the test setup or execution. + */ + @Test + public void testFailureInGetPathStatusDuringRenameRecovery() throws Exception { + try (AzureBlobFileSystem fs = getFileSystem()) { + assumeRecoveryThroughClientTransactionID(false); + AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient()); + fs.getAbfsStore().setClient(abfsDfsClient); + final String[] clientTransactionId = new String[1]; + mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId); + mockRetriedRequest(abfsDfsClient, new ArrayList<>()); + boolean[] flag = new boolean[1]; + Mockito.doAnswer(getPathStatus -> { + if (!flag[0]) { + flag[0] = true; + throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception()); + } + return getPathStatus.callRealMethod(); + }).when(abfsDfsClient).getPathStatus( + Mockito.nullable(String.class), Mockito.nullable(Boolean.class), + Mockito.nullable(TracingContext.class), + Mockito.nullable(ContextEncryptionAdapter.class)); + + Path sourceDir = path("/testSrc"); + assertMkdirs(fs, sourceDir); + String filename = "file1"; + Path sourceFilePath = new Path(sourceDir, filename); + touch(sourceFilePath); + Path destFilePath = new Path(sourceDir, "file2"); + + String errorMessage = intercept(AbfsDriverException.class, + () -> fs.rename(sourceFilePath, destFilePath)).getErrorMessage(); + + Assertions.assertThat(errorMessage) + .describedAs("getPathStatus should fail while recovering") + .contains(ERR_RENAME_RECOVERY); + } + } + + /** + * Mocks the retry behavior for an AbfsDfsClient request. The method intercepts + * the Abfs operation and simulates an HTTP conflict (HTTP 404) error on the + * first invocation. It creates a mock HTTP operation with a PUT method and + * specific status codes and error messages. + * + * @param abfsDfsClient The AbfsDfsClient to mock operations for. + * @param headers The list of HTTP headers to which request headers will be added. + * + * @throws Exception If an error occurs during mock creation or operation execution. + */ + private void mockRetriedRequest(AbfsDfsClient abfsDfsClient, + final List<AbfsHttpHeader> headers) throws Exception { + TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient, + new MockIntercept<AbfsRestOperation>() { + private int count = 0; + + @Override + public void answer(final AbfsRestOperation mockedObj, + final InvocationOnMock answer) + throws AbfsRestOperationException { + if (count == 0) { + count = 1; + AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class); + Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod(); + Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage(); + Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op) + .getStorageErrorCode(); + Mockito.doReturn(true).when(mockedObj).hasResult(); + Mockito.doReturn(op).when(mockedObj).getResult(); + Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode(); + headers.addAll(mockedObj.getRequestHeaders()); + throw new AbfsRestOperationException(HTTP_NOT_FOUND, + SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op); + } + } + }); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/MockIntercept.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/MockIntercept.java new file mode 100644 index 00000000000..2c44ea896de --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/MockIntercept.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.mockito.invocation.InvocationOnMock; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; + +/** + * Interface used to intercept and customize the behavior of mocked + * `AbfsRestOperation` objects. The implementing class should define + * how to handle the mock operation when it is invoked. + * + * @param <T> the type of the mocked object, typically an `AbfsRestOperation` + */ +public interface MockIntercept<T> { + + /** + * Defines custom behavior for handling the mocked object during its execution. + * + * @param mockedObj the mocked `AbfsRestOperation` object + * @param answer the invocation details for the mock method + * @throws AbfsRestOperationException if an error occurs during the + * mock operation handling + */ + void answer(T mockedObj, InvocationOnMock answer) throws AbfsRestOperationException; +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index bc6c69ccc8f..a92fd6f6ca1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; @@ -51,6 +52,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP; import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlockList; @@ -337,4 +339,24 @@ public static void mockGetRenameBlobHandler(AbfsBlobClient blobClient, Mockito.nullable(String.class), Mockito.anyBoolean(), Mockito.any(TracingContext.class)); } + + /** + * Mocks the behavior of adding a client transaction ID to the request headers + * for the given AzureBlobFileSystem. This method generates a random transaction ID + * and adds it to the headers of the {@link AbfsDfsClient}. + * + * @param abfsDfsClient The {@link AbfsDfsClient} mocked AbfsDfsClient. + * @param clientTransactionId An array to hold the generated transaction ID. + */ + public static void mockAddClientTransactionIdToHeader(AbfsDfsClient abfsDfsClient, + String[] clientTransactionId) { + Mockito.doAnswer(addClientTransactionId -> { + clientTransactionId[0] = UUID.randomUUID().toString(); + List<AbfsHttpHeader> headers = addClientTransactionId.getArgument(0); + headers.add( + new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID, + clientTransactionId[0])); + return clientTransactionId[0]; + }).when(abfsDfsClient).addClientTransactionIdToHeader(Mockito.anyList()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index e8ab4291b32..2495e50f434 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; +import org.apache.hadoop.fs.azurebfs.MockIntercept; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.fs.azurebfs.utils.Base64; import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; @@ -37,6 +38,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.ABFS_CLIENT_TIMER_THREAD_NAME; +import static org.mockito.ArgumentMatchers.any; /** * Unit test cases for the AbfsClient class. @@ -138,4 +140,41 @@ private boolean isThreadRunning(String threadName) { } return false; } + + /** + * Mocks the creation of an `AbfsRestOperation` for the given `AbfsClient` and intercepts its execution. + * This method sets up a mock behavior where the `AbfsRestOperation` will call the provided `MockIntercept` + * to handle custom logic during the operation execution. + * + * @param abfsClient the `AbfsClient` to mock the operation for + * @param mockIntercept the mock interceptor that defines custom behavior during the operation execution + * @throws Exception if an error occurs while mocking the operation creation + */ + public static void mockAbfsOperationCreation(final AbfsClient abfsClient, + final MockIntercept mockIntercept) throws Exception { + boolean[] flag = new boolean[1]; + Mockito.doAnswer(answer -> { + if (!flag[0]) { + flag[0] = true; + AbfsRestOperation op = Mockito.spy( + new AbfsRestOperation( + answer.getArgument(0), + abfsClient, + answer.getArgument(1), + answer.getArgument(2), + answer.getArgument(3), + abfsClient.getAbfsConfiguration() + )); + Mockito.doAnswer((answer1) -> { + mockIntercept.answer(op, answer1); + return null; + }).when(op) + .execute(any()); + Mockito.doReturn(true).when(op).isARetriedRequest(); + return op; + } + return answer.callRealMethod(); + }).when(abfsClient) + .getAbfsRestOperation(any(), any(), any(), any()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 03a3089923f..b040a4b12f9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -345,8 +345,17 @@ public void testRenameRecoveryFailsForDirFsLevel() throws Exception { Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName()); // source eTag does not match -> rename should be a failure + int newConnections; boolean renameResult = fs.rename(path1, path2); - assertEquals(false, renameResult); + if (getConfiguration().getIsClientTransactionIdEnabled()) { + // Recovery based on client transaction id should be successful + assertTrue(renameResult); + // One extra getPathStatus call should have happened + newConnections = 5; + } else { + assertFalse(renameResult); + newConnections = 4; + } // validating stat counters after rename // 3 calls should have happened in total for rename @@ -355,7 +364,7 @@ public void testRenameRecoveryFailsForDirFsLevel() throws Exception { // last getPathStatus call should be skipped assertThatStatisticCounter(ioStats, CONNECTIONS_MADE.getStatName()) - .isEqualTo(4 + connMadeBeforeRename); + .isEqualTo(newConnections + connMadeBeforeRename); // the RENAME_PATH_ATTEMPTS stat should be incremented by 1 // retries happen internally within AbfsRestOperation execute() @@ -396,11 +405,16 @@ public void testDirRenameRecoveryUnsupported() throws Exception { fs.mkdirs(new Path(path1)); - // source eTag does not match -> throw exception - expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> - spyClient.renamePath(path1, path2, null, - testTracingContext, null, - false))); + if (getConfiguration().getIsClientTransactionIdEnabled()) { + // Recovery based on client transaction id should be successful + assertTrue(fs.rename(new Path(path1), new Path(path2))); + } else { + // source eTag does not match -> throw exception + expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, + testTracingContext, null, + false))); + } } /** @@ -539,11 +553,20 @@ public void testResilientCommitOperationTagMismatch() throws Throwable { final Path source = new Path(path1); touch(source); - final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag(); final ResilientCommitByRename commit = fs.createResilientCommitSupport(source); - intercept(FileNotFoundException.class, () -> - commit.commitSingleFileByRename(source, new Path(path2), "not the right tag")); + // When client transaction ID is enabled, the commit should succeed. + if (getConfiguration().getIsClientTransactionIdEnabled()) { + Pair<Boolean, Duration> response = commit.commitSingleFileByRename(source, new Path(path2), + "not the right tag"); + Assertions.assertThat(response.getKey()) + .describedAs("Recovery using client transaction ID") + .isTrue(); + } else { + intercept(FileNotFoundException.class, () -> + commit.commitSingleFileByRename(source, new Path(path2), + "not the right tag")); + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org