This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch feature-HADOOP-18073-sdk-v2-upgrade-3.6-aws in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit d5472f9d6db27e4c44cd336c614aa6211e945fa2 Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Fri Sep 15 15:45:09 2023 +0100 HADOOP-18888. S3A. createS3AsyncClient() always enables multipart uploads (#6056) * The multipart flag fs.s3a.multipart.uploads.enabled is passed to the async client created * s3A connector bypasses the transfer manager entirely if disabled or for small files. Contributed by Steve Loughran --- .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 2 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 93 +++++++++++++++------- .../org/apache/hadoop/fs/s3a/S3AInternals.java | 6 ++ .../org/apache/hadoop/fs/s3a/S3ClientFactory.java | 24 ++++++ .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 4 +- .../fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java | 53 +++++------- 6 files changed, 118 insertions(+), 64 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 98c72d276628..c85263f1903a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -112,7 +112,7 @@ public class DefaultS3ClientFactory extends Configured return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) .httpClientBuilder(httpClientBuilder) .multipartConfiguration(multipartConfiguration) - .multipartEnabled(true) + .multipartEnabled(parameters.isMultipartCopy()) .build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index d0a32148c9d5..da828123ec77 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -440,6 +440,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED; + /** + * Should file copy operations use the S3 transfer manager? + * True unless multipart upload is disabled. + */ + private boolean isMultipartCopyEnabled; + /** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. @@ -576,6 +582,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, this.prefetchBlockSize = (int) prefetchBlockSizeLong; this.prefetchBlockCount = intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); + this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + DEFAULT_MULTIPART_UPLOAD_ENABLED); + // multipart copy and upload are the same; this just makes it explicit + this.isMultipartCopyEnabled = isMultipartUploadEnabled; initThreadPools(conf); @@ -983,6 +993,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, .withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS)) .withExecutionInterceptors(auditManager.createExecutionInterceptors()) .withMinimumPartSize(partSize) + .withMultipartCopyEnabled(isMultipartCopyEnabled) .withMultipartThreshold(multiPartThreshold) .withTransferManagerExecutor(unboundedThreadPool) .withRegion(region); @@ -1468,6 +1479,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, LOG.debug("Sharing credentials for: {}", purpose); return credentials.share(); } + + @Override + public boolean isMultipartCopyEnabled() { + return S3AFileSystem.this.isMultipartUploadEnabled; + } } /** @@ -4432,37 +4448,56 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, e); } - return readInvoker.retry( - action, srcKey, - true, - () -> { - CopyObjectRequest.Builder copyObjectRequestBuilder = - getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom); - changeTracker.maybeApplyConstraint(copyObjectRequestBuilder); - incrementStatistic(OBJECT_COPY_REQUESTS); - - Copy copy = transferManager.copy( - CopyRequest.builder() - .copyObjectRequest(copyObjectRequestBuilder.build()) - .build()); + CopyObjectRequest.Builder copyObjectRequestBuilder = + getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom); + changeTracker.maybeApplyConstraint(copyObjectRequestBuilder); + CopyObjectResponse response; - try { - CompletedCopy completedCopy = copy.completionFuture().join(); - CopyObjectResponse result = completedCopy.response(); - changeTracker.processResponse(result); - incrementWriteOperations(); - instrumentation.filesCopied(1, size); - return result; - } catch (CompletionException e) { - Throwable cause = e.getCause(); - if (cause instanceof SdkException) { - SdkException awsException = (SdkException)cause; - changeTracker.processException(awsException, "copy"); - throw awsException; + // transfer manager is skipped if disabled or the file is too small to worry about + final boolean useTransferManager = isMultipartCopyEnabled && size >= multiPartThreshold; + if (useTransferManager) { + // use transfer manager + response = readInvoker.retry( + action, srcKey, + true, + () -> { + incrementStatistic(OBJECT_COPY_REQUESTS); + + Copy copy = transferManager.copy( + CopyRequest.builder() + .copyObjectRequest(copyObjectRequestBuilder.build()) + .build()); + + try { + CompletedCopy completedCopy = copy.completionFuture().join(); + return completedCopy.response(); + } catch (CompletionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SdkException) { + SdkException awsException = (SdkException)cause; + changeTracker.processException(awsException, "copy"); + throw awsException; + } + throw extractException(action, srcKey, e); } - throw extractException(action, srcKey, e); - } - }); + }); + } else { + // single part copy bypasses transfer manager + // note, this helps with some mock testing, e.g. HBoss. as there is less to mock. + response = readInvoker.retry( + action, srcKey, + true, + () -> { + LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size); + incrementStatistic(OBJECT_COPY_REQUESTS); + return s3Client.copyObject(copyObjectRequestBuilder.build()); + }); + } + + changeTracker.processResponse(response); + incrementWriteOperations(); + instrumentation.filesCopied(1, size); + return response; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java index 23c4d3501206..18d6c1af586f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java @@ -115,4 +115,10 @@ public interface S3AInternals { @AuditEntryPoint @Retries.RetryTranslated HeadBucketResponse getBucketMetadata() throws IOException; + + /** + * Is multipart copy enabled? + * @return true if the transfer manager is used to copy files. + */ + boolean isMultipartCopyEnabled(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index d4504cd08d74..e2e792ebfb66 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -156,6 +156,11 @@ public interface S3ClientFactory { */ private long multiPartThreshold; + /** + * Multipart upload enabled. + */ + private boolean multipartCopy = true; + /** * Executor that the transfer manager will use to execute background tasks. */ @@ -399,5 +404,24 @@ public interface S3ClientFactory { public Region getRegion() { return region; } + + /** + * Set the multipart flag.. + * + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withMultipartCopyEnabled(final boolean value) { + this.multipartCopy = value; + return this; + } + + /** + * Get the multipart flag. + * @return multipart flag + */ + public boolean isMultipartCopy() { + return multipartCopy; + } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index f30eb0f11ae0..628b56704182 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -355,10 +355,10 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { /** * Is this expected to be a multipart upload? * Assertions will change if not. - * @return true by default. + * @return what the filesystem expects. */ protected boolean expectMultipartUpload() { - return true; + return getFileSystem().getS3AInternals().isMultipartCopyEnabled(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java index ed300dba01ea..e154ab5676f8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java @@ -18,13 +18,10 @@ package org.apache.hadoop.fs.s3a.scale; +import org.assertj.core.api.Assertions; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Constants; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; @@ -33,7 +30,6 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Use a single PUT for the whole upload/rename/delete workflow; include verification @@ -41,11 +37,6 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept; */ public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles { - /** - * Size to ensure MPUs don't happen in transfer manager. - */ - public static final String S_1T = "1T"; - public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h"; /** @@ -56,11 +47,23 @@ public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles { return Constants.FAST_UPLOAD_BUFFER_DISK; } + /** + * Multipart upload is always disabled. + * @return false + */ @Override protected boolean expectMultipartUpload() { return false; } + /** + * Is multipart copy enabled? + * @return true if the transfer manager is used to copy files. + */ + private boolean isMultipartCopyEnabled() { + return getFileSystem().getS3AInternals().isMultipartCopyEnabled(); + } + /** * Create a configuration without multipart upload, * and a long request timeout to allow for a very slow @@ -77,35 +80,21 @@ public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles { MULTIPART_SIZE, REQUEST_TIMEOUT); conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); - conf.set(MIN_MULTIPART_THRESHOLD, S_1T); - conf.set(MULTIPART_SIZE, S_1T); + conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT); return conf; } /** - * After the file is created, attempt a rename with an FS - * instance with a small multipart threshold; - * this MUST be rejected. + * Verify multipart copy is disabled. */ @Override public void test_030_postCreationAssertions() throws Throwable { - assumeHugeFileExists(); - final Path hugefile = getHugefile(); - final Path hugefileRenamed = getHugefileRenamed(); - describe("renaming %s to %s", hugefile, hugefileRenamed); - S3AFileSystem fs = getFileSystem(); - fs.delete(hugefileRenamed, false); - // create a new fs with a small multipart threshold; expect rename failure. - final Configuration conf = new Configuration(fs.getConf()); - conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); - conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); - S3ATestUtils.disableFilesystemCaching(conf); - - try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) { - intercept(UnsupportedRequestException.class, () -> - fs2.rename(hugefile, hugefileRenamed)); - } + super.test_030_postCreationAssertions(); + Assertions.assertThat(isMultipartCopyEnabled()) + .describedAs("Multipart copy should be disabled in %s", getFileSystem()) + .isFalse(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org