This is an automated email from the ASF dual-hosted git repository. anujmodi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new bc5a5b8543a HADOOP-19658. ABFS:Create and rename idempotency for FNS Blob (#7914) bc5a5b8543a is described below commit bc5a5b8543a2cc35d3c2e8ccb52329de62e78639 Author: Anmol Asrani <anmol.asrani...@gmail.com> AuthorDate: Wed Sep 3 04:12:35 2025 +0000 HADOOP-19658. ABFS:Create and rename idempotency for FNS Blob (#7914) Contributed by Anmol Asrani --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 14 ++ .../fs/azurebfs/constants/ConfigurationKeys.java | 2 + .../constants/FileSystemConfigurations.java | 2 + .../fs/azurebfs/services/AbfsBlobClient.java | 32 ++- .../hadoop/fs/azurebfs/services/AbfsIoUtils.java | 12 +- .../fs/azurebfs/ITestAbfsNetworkStatistics.java | 264 +++++++++++---------- .../azurebfs/ITestAzureBlobFileSystemCreate.java | 110 ++++++++- .../fs/azurebfs/ITestAzureBlobFileSystemLease.java | 2 +- .../azurebfs/ITestAzureBlobFileSystemRename.java | 82 +++++++ .../fs/azurebfs/ITestWasbAbfsCompatibility.java | 6 +- .../fs/azurebfs/services/AbfsClientTestUtil.java | 117 +++++++++ 11 files changed, 508 insertions(+), 135 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index f570f82c5be..7ae8f76b187 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -492,6 +492,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID) private boolean enableClientTransactionId; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, + DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY) + private boolean enableCreateIdempotency; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1047,6 +1051,12 @@ public String getAzureAtomicRenameDirs() { } public boolean isConditionalCreateOverwriteEnabled() { + // If either the configured FS service type or the ingress service type is BLOB, + // conditional create-overwrite is not used. + if (getIsCreateIdempotencyEnabled() && (getFsConfiguredServiceType() == AbfsServiceType.BLOB + || getIngressServiceType() == AbfsServiceType.BLOB)) { + return false; + } return this.enableConditionalCreateOverwrite; } @@ -1178,6 +1188,10 @@ public boolean getIsClientTransactionIdEnabled() { return enableClientTransactionId; } + public boolean getIsCreateIdempotencyEnabled() { + return enableCreateIdempotency; + } + /** * Enum config to allow user to pick format of x-ms-client-request-id header * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index ef39fb11b2d..7d73f1a3fe7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -430,6 +430,8 @@ public static String containerProperty(String property, String fsName, String ac public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread"; /**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/ public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id"; + /**Flag to enable/disable create idempotency during create operation: {@value}*/ + public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = "fs.azure.enable.create.blob.idempotency"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index d53e936fd5c..640a658b955 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -240,5 +240,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true; + public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index bb46a97835f..77aca70990f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -509,9 +509,34 @@ public AbfsRestOperation createPath(final String path, final TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsRestOperation op; if (isFileCreation) { - // Create a file with the specified parameters - op = createFile(path, overwrite, permissions, isAppendBlob, eTag, - contextEncryptionAdapter, tracingContext); + if (getAbfsConfiguration().getIsCreateIdempotencyEnabled()) { + AbfsRestOperation statusOp = null; + try { + // Check if the file already exists by calling GetPathStatus + statusOp = getPathStatus(path, tracingContext, null, false); + } catch (AbfsRestOperationException ex) { + // If the path does not exist, continue with file creation + // For other errors, rethrow the exception + if (ex.getStatusCode() != HTTP_NOT_FOUND) { + throw ex; + } + } + // If the file exists and overwrite is not allowed, throw conflict + if (statusOp != null && statusOp.hasResult() && !overwrite) { + throw new AbfsRestOperationException( + HTTP_CONFLICT, + AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), + PATH_EXISTS, + null); + } else { + // Proceed with file creation (force overwrite = true) + op = createFile(path, true, permissions, isAppendBlob, eTag, + contextEncryptionAdapter, tracingContext); + } + } else { + op = createFile(path, overwrite, permissions, isAppendBlob, eTag, + contextEncryptionAdapter, tracingContext); + } } else { // Create a directory with the specified parameters op = createDirectory(path, permissions, isAppendBlob, eTag, @@ -584,7 +609,6 @@ public AbfsRestOperation createPathRestOp(final String path, if (eTag != null && !eTag.isEmpty()) { requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); } - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = getAbfsRestOperation( AbfsRestOperationType.PutBlob, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java index 44fa2d8d8bd..22fd9e15b6b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java @@ -18,8 +18,10 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +56,15 @@ public static void dumpHeadersToDebugLog(final String origin, if (key == null) { key = "HTTP Response"; } - String values = StringUtils.join(";", entry.getValue()); + List<String> valuesList = entry.getValue(); + if (valuesList == null) { + valuesList = Collections.emptyList(); + } else { + valuesList = valuesList.stream() + .map(v -> v == null ? "" : v) // replace null with empty string + .collect(Collectors.toList()); + } + String values = StringUtils.join(";", valuesList); if (key.contains("Cookie")) { values = "*cookie info*"; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index e29bfc5f624..5eb0c3d9910 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -25,9 +25,11 @@ import org.slf4j.LoggerFactory; import org.junit.jupiter.api.Test; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; @@ -41,6 +43,8 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY; public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { @@ -74,134 +78,69 @@ private int countDirectory(String path) { public void testAbfsHttpSendStatistics() throws IOException { describe("Test to check correct values of statistics after Abfs http send " + "request is done."); + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, false); + FileSystem fileSystem = FileSystem.newInstance(conf); + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) fileSystem) { + Map<String, Long> metricMap; + Path sendRequestPath = path(getMethodName()); + String path = sendRequestPath.toString(); + int directory = countDirectory(path); + String testNetworkStatsString = "http_send"; + + metricMap = getInstrumentationMap(fs); + long expectedConnectionsMade = metricMap.get( + CONNECTIONS_MADE.getStatName()); + long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName()); + long expectedBytesSent = 0; + AbfsClient client = fs.getAbfsStore() + .getClientHandler() + .getIngressClient(); - AzureBlobFileSystem fs = getFileSystem(); - Map<String, Long> metricMap; - Path sendRequestPath = path(getMethodName()); - String path = sendRequestPath.toString(); - int directory = countDirectory(path); - String testNetworkStatsString = "http_send"; - - metricMap = getInstrumentationMap(fs); - long expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName()); - long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName()); - long expectedBytesSent = 0; - AbfsClient client = fs.getAbfsStore().getClientHandler().getIngressClient(); - - // -------------------------------------------------------------------- - // Operation: Creating AbfsOutputStream - try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, - sendRequestPath)) { - // Network stats calculation: For Creating AbfsOutputStream: - // 1 create request = 1 connection made and 1 send request - if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) { - expectedRequestsSent += (directory); - // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call. - expectedConnectionsMade += ((directory * 2)); - } else { - expectedRequestsSent++; - expectedConnectionsMade++; - } - // -------------------------------------------------------------------- - - // Operation: Write small data - // Network stats calculation: No additions. - // Data written is less than the buffer size and hence will not - // trigger any append request to store - out.write(testNetworkStatsString.getBytes()); - // -------------------------------------------------------------------- - - // Operation: HFlush - // Flushes all outstanding data (i.e. the current unfinished packet) - // from the client into the service on all DataNode replicas. - out.hflush(); - /* - * Network stats calculation: - * 3 possibilities here: - * A. As there is pending data to be written to store, this will result in: - * 1 append + 1 flush = 2 connections and 2 send requests - * - * B. If config "fs.azure.enable.small.write.optimization" is enabled, append - * and flush call will be merged for small data in buffer in this test. - * In which case it will be: - * 1 append+flush request = 1 connection and 1 send request - * - * C. If the path is configured for append Blob files to be used, hflush - * is a no-op. So in this case: - * 1 append = 1 connection and 1 send request - */ - if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString()) - || (fs.getAbfsStore().getAbfsConfiguration().isSmallWriteOptimizationEnabled())) { - expectedConnectionsMade++; - expectedRequestsSent++; - } else { - expectedConnectionsMade += 2; - expectedRequestsSent += 2; - } - expectedBytesSent += testNetworkStatsString.getBytes().length; // -------------------------------------------------------------------- - - // Assertions - metricMap = getInstrumentationMap(fs); - assertAbfsStatistics(CONNECTIONS_MADE, - expectedConnectionsMade, metricMap); - assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, - metricMap); - assertAbfsStatistics(AbfsStatistic.BYTES_SENT, - expectedBytesSent, metricMap); - } - - // -------------------------------------------------------------------- - // Operation: AbfsOutputStream close. - // Network Stats calculation: 1 flush (with close) is send. - // 1 flush request = 1 connection and 1 send request - // Flush with no data is a no-op for blob endpoint, hence update only for dfs endpoint. - if (client instanceof AbfsDfsClient) { - expectedConnectionsMade++; - expectedRequestsSent++; - } - // -------------------------------------------------------------------- - - // Operation: Re-create the file / create overwrite scenario - try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, - sendRequestPath)) { - /* - * Network Stats calculation: create overwrite - * There are 2 possibilities here. - * A. create overwrite results in 1 server call - * create with overwrite=true = 1 connection and 1 send request - * - * B. If config "fs.azure.enable.conditional.create.overwrite" is enabled, - * create overwrite=false (will fail in this case as file is indeed present) - * + getFileStatus to fetch the file ETag - * + create overwrite=true - * = 3 connections and 2 send requests in case of Dfs Client - * = 1 ListBlob + 2 GPS + 2 PutBlob - */ - if (fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled()) { + // Operation: Creating AbfsOutputStream + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + // Network stats calculation: For Creating AbfsOutputStream: + // 1 create request = 1 connection made and 1 send request if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) { - expectedRequestsSent += 2; - expectedConnectionsMade += 5; + expectedRequestsSent += (directory); + // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call. + expectedConnectionsMade += ((directory * 2)); } else { - expectedConnectionsMade += 3; - expectedRequestsSent += 2; + expectedRequestsSent++; + expectedConnectionsMade++; } - } else { - expectedConnectionsMade += 1; - expectedRequestsSent += 1; - } - // -------------------------------------------------------------------- + // -------------------------------------------------------------------- - // Operation: Multiple small appends + hflush - for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) { + // Operation: Write small data + // Network stats calculation: No additions. + // Data written is less than the buffer size and hence will not + // trigger any append request to store out.write(testNetworkStatsString.getBytes()); - // Network stats calculation: no-op. Small write + // -------------------------------------------------------------------- + + // Operation: HFlush + // Flushes all outstanding data (i.e. the current unfinished packet) + // from the client into the service on all DataNode replicas. out.hflush(); - // Network stats calculation: Hflush - // refer to previous comments for hFlush network stats calcualtion - // possibilities + /* + * Network stats calculation: + * 3 possibilities here: + * A. As there is pending data to be written to store, this will result in: + * 1 append + 1 flush = 2 connections and 2 send requests + * + * B. If config "fs.azure.enable.small.write.optimization" is enabled, append + * and flush call will be merged for small data in buffer in this test. + * In which case it will be: + * 1 append+flush request = 1 connection and 1 send request + * + * C. If the path is configured for append Blob files to be used, hflush + * is a no-op. So in this case: + * 1 append = 1 connection and 1 send request + */ if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString()) - || (this.getConfiguration().isSmallWriteOptimizationEnabled())) { + || (fs.getAbfsStore().getAbfsConfiguration().isSmallWriteOptimizationEnabled())) { expectedConnectionsMade++; expectedRequestsSent++; } else { @@ -209,16 +148,91 @@ public void testAbfsHttpSendStatistics() throws IOException { expectedRequestsSent += 2; } expectedBytesSent += testNetworkStatsString.getBytes().length; + // -------------------------------------------------------------------- + + // Assertions + metricMap = getInstrumentationMap(fs); + assertAbfsStatistics(CONNECTIONS_MADE, + expectedConnectionsMade, metricMap); + assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, + metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + expectedBytesSent, metricMap); + } + + // -------------------------------------------------------------------- + // Operation: AbfsOutputStream close. + // Network Stats calculation: 1 flush (with close) is send. + // 1 flush request = 1 connection and 1 send request + // Flush with no data is a no-op for blob endpoint, hence update only for dfs endpoint. + if (client instanceof AbfsDfsClient) { + expectedConnectionsMade++; + expectedRequestsSent++; } // -------------------------------------------------------------------- - // Assertions - metricMap = fs.getInstrumentationMap(); - assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap); - assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap); - assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, metricMap); + // Operation: Re-create the file / create overwrite scenario + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + /* + * Network Stats calculation: create overwrite + * There are 2 possibilities here. + * A. create overwrite results in 1 server call + * create with overwrite=true = 1 connection and 1 send request + * + * B. If config "fs.azure.enable.conditional.create.overwrite" is enabled, + * create overwrite=false (will fail in this case as file is indeed present) + * + getFileStatus to fetch the file ETag + * + create overwrite=true + * = 3 connections and 2 send requests in case of Dfs Client + * = 1 ListBlob + 2 GPS + 2 PutBlob + */ + if (fs.getAbfsStore() + .getAbfsConfiguration() + .isConditionalCreateOverwriteEnabled()) { + if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) { + expectedRequestsSent += 2; + expectedConnectionsMade += 5; + } else { + expectedConnectionsMade += 3; + expectedRequestsSent += 2; + } + } else { + expectedConnectionsMade += 1; + expectedRequestsSent += 1; + } + // -------------------------------------------------------------------- + + // Operation: Multiple small appends + hflush + for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) { + out.write(testNetworkStatsString.getBytes()); + // Network stats calculation: no-op. Small write + out.hflush(); + // Network stats calculation: Hflush + // refer to previous comments for hFlush network stats calcualtion + // possibilities + if (fs.getAbfsStore() + .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString()) + || (this.getConfiguration().isSmallWriteOptimizationEnabled())) { + expectedConnectionsMade++; + expectedRequestsSent++; + } else { + expectedConnectionsMade += 2; + expectedRequestsSent += 2; + } + expectedBytesSent += testNetworkStatsString.getBytes().length; + } + // -------------------------------------------------------------------- + + // Assertions + metricMap = fs.getInstrumentationMap(); + assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, + metricMap); + assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, + metricMap); + } } - } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index c91b8a1f93b..f07b9a5e1dc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -88,6 +89,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; @@ -96,6 +98,7 @@ import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; @@ -460,10 +463,18 @@ public void testDefaultCreateOverwriteFileTest() throws Throwable { public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) throws Throwable { + if (enableConditionalCreateOverwrite) { + assumeHnsEnabled(); + assumeDfsServiceType(); + assumeThat(getIngressServiceType()) + .as("DFS service type is required for this test") + .isEqualTo(AbfsServiceType.DFS); + } try (AzureBlobFileSystem currentFs = getFileSystem()) { Configuration config = new Configuration(this.getRawConfiguration()); config.set("fs.azure.enable.conditional.create.overwrite", Boolean.toString(enableConditionalCreateOverwrite)); + config.set("fs.azure.enable.create.idempotency", "false"); AzureBlobFileSystemStore store = currentFs.getAbfsStore(); AbfsClient client = store.getClientHandler().getIngressClient(); @@ -595,7 +606,11 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) @Test public void testNegativeScenariosForCreateOverwriteDisabled() throws Throwable { - + assumeHnsEnabled(); + assumeDfsServiceType(); + assumeThat(getIngressServiceType()) + .as("DFS service type is required for this test") + .isEqualTo(AbfsServiceType.DFS); try (AzureBlobFileSystem currentFs = getFileSystem()) { Configuration config = new Configuration(this.getRawConfiguration()); config.set("fs.azure.enable.conditional.create.overwrite", @@ -1087,6 +1102,7 @@ public void testParallelCreateOverwriteFalse() throws Exception { Configuration configuration = getRawConfiguration(); configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false"); + configuration.set(FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, "false"); try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( configuration)) { ExecutorService executorService = Executors.newFixedThreadPool(5); @@ -2236,6 +2252,98 @@ public void testFailureInGetPathStatusDuringCreateRecovery() throws Exception { } } + /** + * Test to simulate a successful create operation followed by a connection reset + * on the response, triggering a retry. + * + * This test verifies that the create operation is retried in the event of a + * connection reset during the response phase. The test creates a mock + * AzureBlobFileSystem and its associated components to simulate the create + * operation and the connection reset. It then verifies that the create + * operation is retried once before succeeding. + * + * @throws Exception if an error occurs during the test execution. + */ + @Test + public void testCreateIdempotencyForNonHnsBlob() throws Exception { + assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND BLOB").isFalse(); + assumeHnsDisabled(); + assumeBlobServiceType(); + // Create a spy of AzureBlobFileSystem + try (AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { + // Create a spy of AzureBlobFileSystemStore + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + + // Create spies for the client handler and blob client + AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); + AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); + fs.getAbfsStore().setClient(blobClient); + fs.getAbfsStore().setClientHandler(clientHandler); + // Set up the spies to return the mocked objects + Mockito.doReturn(clientHandler).when(store).getClientHandler(); + Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); + Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); + + AtomicInteger createCount = new AtomicInteger(0); + + Mockito.doAnswer(answer -> { + // Set up the mock for the create operation + AbfsClientTestUtil.setMockAbfsRestOperationForCreateOperation(blobClient, + (httpOperation) -> { + Mockito.doAnswer(invocation -> { + // Call the real processResponse method + invocation.callRealMethod(); + + int currentCount = createCount.incrementAndGet(); + if (currentCount == 2) { + Mockito.when(httpOperation.getStatusCode()) + .thenReturn( + HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error + Mockito.when(httpOperation.getStorageErrorMessage()) + .thenReturn("CONNECTION_RESET"); // Error message + throw new IOException("Connection Reset"); + } + return null; + }).when(httpOperation).processResponse( + Mockito.nullable(byte[].class), + Mockito.anyInt(), + Mockito.anyInt() + ); + + return httpOperation; + }); + return answer.callRealMethod(); + }).when(blobClient).createPath( + Mockito.anyString(), + Mockito.anyBoolean(), + Mockito.anyBoolean(), + Mockito.any(AzureBlobFileSystemStore.Permissions.class), + Mockito.anyBoolean(), Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class), + any(TracingContext.class) + ); + + Path path = new Path("/test/file"); + fs.create(path, false); + Mockito.verify(blobClient, Mockito.times(1)).createPath( + Mockito.anyString(), + Mockito.anyBoolean(), + Mockito.anyBoolean(), + Mockito.any(AzureBlobFileSystemStore.Permissions.class), + Mockito.anyBoolean(), Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class), + any(TracingContext.class)); + + Mockito.verify(blobClient, Mockito.times(2)).createPathRestOp( + Mockito.anyString(), + Mockito.anyBoolean(), + Mockito.anyBoolean(), + Mockito.anyBoolean(), + Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class), + any(TracingContext.class)); + assertIsFile(fs, path); + } + } + /** * Mocks and returns an instance of {@link AbfsDfsClient} for the given AzureBlobFileSystem. * This method sets up the necessary mock behavior for the client handler and ingress client. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java index e7dcc78ace9..95bd76c023b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java @@ -160,7 +160,7 @@ public void testTwoCreate() throws Exception { try (FSDataOutputStream out = fs.create(testFilePath)) { LambdaTestUtils.intercept(IOException.class, - isHNSEnabled ? PARALLEL_ACCESS + isHNSEnabled && getIngressServiceType() == AbfsServiceType.DFS ? PARALLEL_ACCESS : client instanceof AbfsBlobClient ? ERR_NO_LEASE_ID_SPECIFIED_BLOB : ERR_NO_LEASE_ID_SPECIFIED, () -> { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 1be40c09dbc..73a826c601a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -52,6 +52,7 @@ import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler; import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader; @@ -74,6 +75,7 @@ import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT; import static java.net.HttpURLConnection.HTTP_CONFLICT; import static java.net.HttpURLConnection.HTTP_FORBIDDEN; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_OK; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; @@ -108,6 +110,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assumptions.assumeThat; /** * Test rename operation. @@ -1702,6 +1705,85 @@ public void testRenamePathRetryIdempotency() throws Exception { } } + /** + * Test to simulate a successful copy blob operation followed by a connection reset + * on the response, triggering a retry. + * + * This test verifies that the copy blob operation is retried in the event of a + * connection reset during the response phase. The test creates a mock + * AzureBlobFileSystem and its associated components to simulate the copy blob + * operation and the connection reset. It then verifies that the create + * operation is retried once before succeeding. + * + * @throws Exception if an error occurs during the test execution. + */ + @Test + public void testRenameIdempotencyForNonHnsBlob() throws Exception { + assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND BLOB").isFalse(); + assumeHnsDisabled(); + assumeBlobServiceType(); + // Create a spy of AzureBlobFileSystem + try (AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { + // Create a spy of AzureBlobFileSystemStore + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + + // Create spies for the client handler and blob client + AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); + AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); + fs.getAbfsStore().setClient(blobClient); + fs.getAbfsStore().setClientHandler(clientHandler); + // Set up the spies to return the mocked objects + Mockito.doReturn(clientHandler).when(store).getClientHandler(); + Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); + Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); + + AtomicInteger copyBlobCount = new AtomicInteger(0); + Path sourceDir = path("/testSrc"); + assertMkdirs(fs, sourceDir); + String filename = "file1"; + Path sourceFilePath = new Path(sourceDir, filename); + touch(sourceFilePath); + Path destFilePath = new Path(sourceDir, "file2"); + Mockito.doAnswer(answer -> { + // Set up the mock for the create operation + AbfsClientTestUtil.setMockAbfsRestOperationForCopyBlobOperation(blobClient, sourceFilePath, destFilePath, + (httpOperation) -> { + Mockito.doAnswer(invocation -> { + // Call the real processResponse method + invocation.callRealMethod(); + + int currentCount = copyBlobCount.incrementAndGet(); + if (currentCount == 1) { + Mockito.when(httpOperation.getStatusCode()) + .thenReturn( + HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error + Mockito.when(httpOperation.getStorageErrorMessage()) + .thenReturn("CONNECTION_RESET"); // Error message + throw new IOException("Connection Reset"); + } + return null; + }).when(httpOperation).processResponse( + Mockito.nullable(byte[].class), + Mockito.anyInt(), + Mockito.anyInt() + ); + + return httpOperation; + }); + return answer.callRealMethod(); + }).when(blobClient).copyBlob( + Mockito.any(Path.class), + Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class) + ); + Assertions.assertThat(fs.rename(sourceFilePath, destFilePath)) + .describedAs("Rename should succeed.") + .isTrue(); + } + } + /** * Test to verify that the client transaction ID is included in the response header * after renaming a file in Azure Blob Storage. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java index ec63c3dcda7..e9f219c20fb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java @@ -622,8 +622,8 @@ public void testScenario8() throws Exception { .isEqualTo(HTTP_CONFLICT); } Assertions.assertThat(e.getMessage()) - .as("Expected error message to contain 'AlreadyExists'") - .contains("AlreadyExists"); + .as("Expected error message to contain 'Exists'") + .containsIgnoringCase("Exists"); } // Remove file @@ -2069,4 +2069,4 @@ private static void assertIsFile(Path path, FileStatus status) { .as("Expected a regular file, but was a symlink: %s %s", path, status) .isFalse(); } -} +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index b9dcefc35e2..cd1a2af7d6c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.assertj.core.api.Assertions; @@ -41,6 +42,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.util.functional.FunctionRaisingIOE; @@ -50,16 +52,22 @@ import static org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_BLOB_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP; +import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlob; import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlockList; import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth; import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION; @@ -192,6 +200,115 @@ public static void setMockAbfsRestOperationForFlushOperation( addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE); } + /** + * Sets up a mocked {@link AbfsRestOperation} for a create (PutBlob) operation + * in the Azure Blob File System (ABFS). + * <p> + * This method is intended for use in testing scenarios where the behavior of + * a create request needs to be simulated. It configures a mock + * {@link AbfsRestOperation} with the appropriate request headers and parameters + * for a {@code PutBlob} call, and applies the provided {@code functionRaisingIOE} + * to customize the behavior of the underlying {@link AbfsHttpOperation}. + * <p> + * + * @param spiedClient the spied instance of {@link AbfsClient} used + * for making HTTP requests + * @param functionRaisingIOE a function that customizes the behavior of the + * {@link AbfsRestOperation}'s associated + * {@link AbfsHttpOperation}, enabling the simulation + * of error conditions or special responses + * @throws Exception if an error occurs while setting up the mocked + * operation + */ + public static void setMockAbfsRestOperationForCreateOperation( + final AbfsClient spiedClient, + FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation> functionRaisingIOE) + throws Exception { + List<AbfsHttpHeader> requestHeaders = ITestAbfsClient.getTestRequestHeaders( + spiedClient); + requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, BLOCK_BLOB_TYPE)); + requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, AbfsHttpConstants.ZERO)); + requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML)); + final AbfsUriQueryBuilder abfsUriQueryBuilder = spiedClient.createDefaultUriQueryBuilder(); + final URL url = spiedClient.createRequestUrl("/test/file", abfsUriQueryBuilder.toString()); + AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( + PutBlob, spiedClient, HTTP_METHOD_PUT, + url, + requestHeaders, + spiedClient.getAbfsConfiguration())); + + Mockito.doReturn(abfsRestOperation) + .when(spiedClient) + .getAbfsRestOperation(eq(AbfsRestOperationType.PutBlob), + Mockito.anyString(), Mockito.any(URL.class), Mockito.anyList()); + + addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE); + } + + /** + * Sets up a mocked {@link AbfsRestOperation} for a CopyBlob operation + * in the Azure Blob File System (ABFS). + * <p> + * This method is intended for use in testing scenarios where the behavior of + * a copyBlob request needs to be simulated. It configures a mock + * {@link AbfsRestOperation} with the appropriate request headers and parameters + * for a {@code CopyBlob} call, and applies the provided {@code functionRaisingIOE} + * to customize the behavior of the underlying {@link AbfsHttpOperation}. + * <p> + * + * @param spiedClient the spied instance of {@link AbfsClient} used + * for making HTTP requests + * @param srcPath the source blob path + * @param dstPath the destination blob path + * @param functionRaisingIOE a function that customizes the behavior of the + * {@link AbfsRestOperation}'s associated + * {@link AbfsHttpOperation}, enabling the simulation + * of error conditions or special responses + * @throws Exception if an error occurs while setting up the mocked + * operation + */ + public static void setMockAbfsRestOperationForCopyBlobOperation( + final AbfsClient spiedClient, + final Path srcPath, + final Path dstPath, + FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation> functionRaisingIOE) + throws Exception { + + // Prepare headers + List<AbfsHttpHeader> requestHeaders = ITestAbfsClient.getTestRequestHeaders(spiedClient); + + // Add CopyBlob specific headers + AbfsUriQueryBuilder abfsUriQueryBuilderDst = spiedClient.createDefaultUriQueryBuilder(); + AbfsUriQueryBuilder abfsUriQueryBuilderSrc = new AbfsUriQueryBuilder(); + + String dstBlobRelativePath = dstPath.toUri().getPath(); + String srcBlobRelativePath = srcPath.toUri().getPath(); + + final URL url = spiedClient.createRequestUrl( + dstBlobRelativePath, abfsUriQueryBuilderDst.toString()); + final String sourcePathUrl = spiedClient.createRequestUrl( + srcBlobRelativePath, abfsUriQueryBuilderSrc.toString()).toString(); + + requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl)); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + + // Spy on the real CopyBlob operation + AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( + AbfsRestOperationType.CopyBlob, + spiedClient, + HTTP_METHOD_PUT, + url, + requestHeaders, + spiedClient.getAbfsConfiguration())); + + Mockito.doReturn(abfsRestOperation) + .when(spiedClient) + .getAbfsRestOperation(eq(AbfsRestOperationType.CopyBlob), + Mockito.nullable(String.class), Mockito.any(URL.class), Mockito.anyList()); + + addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE); + } + /** * Adding general mock behaviour to AbfsRestOperation and AbfsHttpOperation * to avoid any NPE occurring. These will avoid any network call made and --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org