This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 0e208c8abd9 HADOOP-19256. S3A: Support Conditional Overwrites 0e208c8abd9 is described below commit 0e208c8abd982a658ecace110038a00c11ee41dc Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Wed Apr 9 17:59:21 2025 +0100 HADOOP-19256. S3A: Support Conditional Overwrites Amazon S3 now supports conditional overwrites, which can be be used when creating files through the createFile() API with two new builder options: fs.option.create.conditional.overwrite: Write if and only if there is no object at the target path. This is an atomic PUT-no-overwrite, checked in close(), not create(). fs.option.create.conditional.overwrite.etag Write a file if and only if it is overwriting a file with a specific etag. If the "fs.s3a.performance.flags" enumeration includes the flag "create" then file creation will use conditional creation to detect and reject overwrites. The configuration option "fs.s3a.create.conditional.enabled" can be set to false to disable these features on third-party stores. Contributed by Diljot Grewal, Saikat Roy and Steve Loughran --- .../main/java/org/apache/hadoop/fs/Options.java | 110 ++++ .../hadoop/fs/statistics/StoreStatisticNames.java | 6 + .../filesystem/fsdataoutputstreambuilder.md | 18 +- .../java/org/apache/hadoop/fs/s3a/Constants.java | 30 + .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 27 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 153 +++-- .../apache/hadoop/fs/s3a/S3AInstrumentation.java | 13 +- .../java/org/apache/hadoop/fs/s3a/Statistic.java | 6 + .../apache/hadoop/fs/s3a/WriteOperationHelper.java | 3 +- .../apache/hadoop/fs/s3a/api/RequestFactory.java | 4 +- .../fs/s3a/commit/magic/S3MagicCommitTracker.java | 13 +- .../org/apache/hadoop/fs/s3a/impl/AWSHeaders.java | 2 + .../hadoop/fs/s3a/impl/CreateFileBuilder.java | 140 +++-- .../hadoop/fs/s3a/impl/InternalConstants.java | 14 +- .../hadoop/fs/s3a/impl/PutObjectOptions.java | 81 ++- .../hadoop/fs/s3a/impl/RequestFactoryImpl.java | 38 +- .../hadoop/fs/s3a/impl/write/WriteObjectFlags.java | 71 +++ .../hadoop/fs/s3a/impl/write/package-info.java | 22 + .../statistics/BlockOutputStreamStatistics.java | 10 + .../statistics/impl/EmptyS3AStatisticsContext.java | 4 + .../site/markdown/tools/hadoop-aws/performance.md | 10 +- .../tools/hadoop-aws/third_party_stores.md | 39 +- .../fs/s3a/ITestS3AClientSideEncryption.java | 4 +- .../hadoop/fs/s3a/ITestS3AMiscOperations.java | 4 +- .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 13 +- .../impl/ITestS3AConditionalCreateBehavior.java | 199 +++++++ .../s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java | 660 +++++++++++++++++++++ .../hadoop/fs/s3a/impl/TestCreateFileBuilder.java | 5 +- .../hadoop/fs/s3a/impl/TestRequestFactory.java | 24 +- .../fs/s3a/performance/ITestCreateFileCost.java | 6 +- .../fs/s3a/scale/ITestS3ADirectoryPerformance.java | 6 +- 31 files changed, 1625 insertions(+), 110 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index f473e9427ba..37143e9d75c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -710,4 +710,114 @@ private OpenFileOptions() { public static final String FS_OPTION_OPENFILE_EC_POLICY = FS_OPTION_OPENFILE + "ec.policy"; } + + /** + * The standard {@code createFile()} options. + * <p> + * If an option is not supported during file creation and it is considered + * part of a commit protocol, then, when supplied in a must() option, + * it MUST be rejected. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public interface CreateFileOptionKeys { + + /** + * {@code createFile()} option to write a file in the close() operation iff + * there is nothing at the destination. + * this is the equivalent of {@code create(path, overwrite=true)} + * <i>except that the existence check is postponed to the end of the write</i>. + * <p> + * Value {@value}. + * </p> + * <p> + * This can be set in the builder. + * </p> + * <ol> + * <li>It is for object stores stores which only upload/manifest files + * at the end of the stream write.</li> + * <li>Streams which support it SHALL not manifest any object to + * the destination path until close()</li> + * <li>It MUST be declared as a stream capability in streams for which + * this overwrite is enabled.</li> + * <li>It MUST be exported as a path capability for all stores where + * the feature is available <i>and</i> enabled</li> + * <li>If passed to a filesystem as a {@code must()} parameter where + * the option value is {@code true}, and it is supported/enabled, + * the FS SHALL omit all overwrite checks in {@code create}, + * including for the existence of an object or a directory underneath. + * Instead, during {@code close()} the object will only be manifest + * at the target path if there is no object at the destination. + * </li> + * <li>The existence check and object creation SHALL be atomic.</li> + * <li>If passed to a filesystem as a {@code must()} parameter where + * the option value is {@code true}, and the FS does not recognise + * the feature, or it is recognized but disabled on this FS instance, + * the filesystem SHALL reject the request. + * </li> + * <li>If passed to a filesystem as a {@code opt()} parameter where + * the option value is {@code true}, the filesystem MAY ignore + * the request, or it MAY enable the feature. + * Any filesystem which does not support the feature, including + * from older releases, SHALL ignore it. + * </li> + * </ol> + */ + String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite"; + + /** + * Overwrite a file only if there is an Etag match. This option takes a string, + * + * Value {@value}. + * <p> + * This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}. + * <ol> + * <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li> + * <li>If supported and enabled, it SHALL be declared as a capability of the stream</li> + * <li>The string passed as the value SHALL be the etag value as returned by + * {@code EtagSource.getEtag()}</li> + * <li>This value MUST NOT be empty</li> + * <li>If passed to a filesystem which supports it, then when the file is created, + * the store SHALL check for the existence of a file/object at the destination + * path. + * </li> + * <li>If there is no object there, the operation SHALL be rejected by raising + * either a {@code org.apache.hadoop.fs.FileAlreadyExistsException} + * exception, or a{@code java.nio.file.FileAlreadyExistsException} + * </li> + * <li>If there is an object there, its Etag SHALL be compared to the + * value passed here.</li> + * <li>If there is no match, the operation SHALL be rejected by raising + * either a {@code org.apache.hadoop.fs.FileAlreadyExistsException} + * exception, or a{@code java.nio.file.FileAlreadyExistsException} + * </li> + * <li>If the etag does match, the file SHALL be created.</li> + * <li>The check and create SHALL be atomic</li> + * <li>The check and create MAY be at the end of the write, in {@code close()}, + * or it MAY be in the {@code create()} operation. That is: some stores + * MAY perform the check early</li> + * <li>If supported and enabled, stores MAY check for the existence of subdirectories; + * this behavior is implementation-specific.</li> + * </ol> + */ + String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG = + "fs.option.create.conditional.overwrite.etag"; + + /** + * A flag which requires the filesystem to create files/objects in close(), + * rather than create/createFile. + * <p> + * Object stores with this behavior should also export it as a path capability. + * + * Value {@value}. + */ + String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close"; + + /** + * String to define the content filetype. + * Value {@value}. + */ + String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type"; + + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index e3deda77528..1b25e9f1fa0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -467,6 +467,12 @@ public final class StoreStatisticNames { public static final String MULTIPART_UPLOAD_LIST = "multipart_upload_list"; + public static final String CONDITIONAL_CREATE + = "conditional_create"; + + public static final String CONDITIONAL_CREATE_FAILED + = "conditional_create_failed"; + private StoreStatisticNames() { } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md index 7dd3170036c..f5569759f93 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md @@ -192,6 +192,7 @@ Here are the custom options which the S3A Connector supports. |-----------------------------|-----------|----------------------------------------| | `fs.s3a.create.performance` | `boolean` | create a file with maximum performance | | `fs.s3a.create.header` | `string` | prefix for user supplied headers | +| `fs.s3a.create.multipart` | `boolean` | create a multipart file | ### `fs.s3a.create.performance` @@ -200,7 +201,8 @@ Prioritize file creation performance over safety checks for filesystem consisten This: 1. Skips the `LIST` call which makes sure a file is being created over a directory. Risk: a file is created over a directory. -2. Ignores the overwrite flag. +2. If the overwrite flag is false and filesystem flag`fs.s3a.create.conditional.enabled` is true, + uses conditional creation to prevent the overwrite of any object at the destination. 3. Never issues a `DELETE` call to delete parent directory markers. It is possible to probe an S3A Filesystem instance for this capability through @@ -243,3 +245,17 @@ When an object is renamed, the metadata is propagated the copy created. It is possible to probe an S3A Filesystem instance for this capability through the `hasPathCapability(path, "fs.s3a.create.header")` check. + +### `fs.s3a.create.multipart` Create a multipart file + +Initiate a multipart upload when a file is created, rather +than only when the amount of data buffered reaches the threshold +set in `fs.s3a.multipart.size`. + +This is only relevant during testing, as it allows for multipart +operation to be initiated without writing any data, so +reducing test time. + +It is not recommended for production use, because as well as adding +more network IO, it is not compatible with third-party stores which +do not supprt multipart uploads. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 822f6822404..6c0efa6e5c3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1522,6 +1522,29 @@ private Constants() { */ public static final String FS_S3A_PERFORMANCE_FLAGS = "fs.s3a.performance.flags"; + + /** + * Is the create overwrite feature enabled or not? + * A configuration option and a path status probe. + * Value {@value}. + */ + public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED = + "fs.s3a.create.conditional.enabled"; + + /** + * Default value for {@link #FS_S3A_CONDITIONAL_CREATE_ENABLED}. + * Value {@value}. + */ + public static final boolean DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED = true; + + /** + * createFile() boolean option toreate a multipart file, always: {@value}. + * <p> + * This is inefficient and will not work on a store which doesn't support that feature, + * so is primarily for testing. + */ + public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart"; + /** * Prefix for adding a header to the object when created. * The actual value must have a "." suffix and then the actual header. @@ -1845,4 +1868,11 @@ private Constants() { public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = "fs.s3a.analytics.accelerator"; + /** + * Value for the {@code If-None-Match} HTTP header in S3 requests. + * Value: {@value}. + * More information: <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html"> + * AWS S3 PutObject API Documentation</a> + */ + public static final String IF_NONE_MATCH_STAR = "*"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index db1134dc5a2..9574485eb9d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -26,6 +26,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -52,6 +53,7 @@ import org.apache.hadoop.fs.s3a.impl.ProgressListener; import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags; import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; @@ -224,6 +226,11 @@ class S3ABlockOutputStream extends OutputStream implements /** Is multipart upload enabled? */ private final boolean isMultipartUploadEnabled; + /** + * Object write option flags. + */ + private final EnumSet<WriteObjectFlags> writeObjectFlags; + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -249,6 +256,7 @@ class S3ABlockOutputStream extends OutputStream implements this.iostatistics = statistics.getIOStatistics(); this.writeOperationHelper = builder.writeOperations; this.putTracker = builder.putTracker; + this.writeObjectFlags = builder.putOptions.getWriteObjectFlags(); this.executorService = MoreExecutors.listeningDecorator( builder.executorService); this.multiPartUpload = null; @@ -266,9 +274,19 @@ class S3ABlockOutputStream extends OutputStream implements ? builder.blockSize : -1; + // if required to be multipart by the committer put tracker or + // write flags (i.e createFile() options, initiate multipart uploads. + // this will fail fast if the store doesn't support multipart uploads if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); + } else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) { + // this not merged simply to avoid confusion + // to what to do it both are set, so as to guarantee + // the put tracker initialization always takes priority + // over any file flag. + LOG.debug("Multipart initiated from createFile() options"); + initMultipartUpload(); } this.isCSEEnabled = builder.isCSEEnabled; this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator; @@ -772,7 +790,8 @@ BlockOutputStreamStatistics getStatistics() { @SuppressWarnings("deprecation") @Override public boolean hasCapability(String capability) { - switch (capability.toLowerCase(Locale.ENGLISH)) { + final String cap = capability.toLowerCase(Locale.ENGLISH); + switch (cap) { // does the output stream have delayed visibility case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT: @@ -797,6 +816,12 @@ public boolean hasCapability(String capability) { return true; default: + // scan flags for the capability + for (WriteObjectFlags flag : writeObjectFlags) { + if (flag.hasKey(cap)) { + return true; + } + } return false; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 2e0d00fd742..304ba032b41 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -153,6 +153,7 @@ import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks; import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements; import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration; +import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -226,6 +227,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE; import static org.apache.hadoop.fs.impl.FlagSet.buildFlagSet; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -512,6 +517,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean s3AccessGrantsEnabled; + /** + * Are the conditional create operations enabled? + */ + private boolean conditionalCreateEnabled; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -694,6 +704,9 @@ public void initialize(URI name, Configuration originalConf) " access points. Upgrading to V2"); useListV1 = false; } + conditionalCreateEnabled = conf.getBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED, + DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED); + signerManager = new SignerManager(bucket, this, conf, owner); signerManager.initCustomSigners(); @@ -2081,20 +2094,66 @@ private FSDataOutputStream innerCreateFile( } EnumSet<CreateFlag> flags = options.getFlags(); - boolean skipProbes = options.isPerformance() || isUnderMagicCommitPath(path); - if (skipProbes) { - LOG.debug("Skipping existence/overwrite checks"); - } else { + /* + Calculate whether to perform HEAD/LIST checks, + and whether the conditional create option should be set. + This seems complicated, but comes down to + "if explicitly requested and the FS enables it, use". + */ + // create file attributes + boolean cCreate = options.isConditionalOverwrite(); + boolean cEtag = options.isConditionalOverwriteEtag(); + boolean createPerf = options.isPerformance(); + boolean overwrite = flags.contains(CreateFlag.OVERWRITE); + + // path attributes + boolean magic = isUnderMagicCommitPath(path); + + // store options + // is CC available. + boolean ccAvailable = conditionalCreateEnabled; + + if (!ccAvailable && (cCreate || cEtag)) { + // fail fast if conditional creation is requested on an FS without it. + throw new PathIOException(path.toString(), "Conditional Writes Unavailable"); + } + + // probes to evaluate. + Set<StatusProbeEnum> probes = EnumSet.of( + StatusProbeEnum.List, StatusProbeEnum.Head); + + + // the PUT is conditional if requested, or if one of the + // this is a performance creation, overwrite has not been requested, + // this is not and etag write *and* conditional creation is available. + // write is NOT conditional etag write. + boolean conditionalPut = cCreate + || !(overwrite || cEtag) && ccAvailable && createPerf; + + // skip the HEAD check for many reasons + // old: the path is magic, it's an overwrite or the "create" performance is set. + // new: also skip if any conditional create operation is in progress + + boolean skipHead = + createPerf || magic || overwrite // classic reasons to skip HEAD + || cCreate || cEtag; // conditional creation + + if (skipHead) { + probes.remove(StatusProbeEnum.Head); + } + + // list logic + boolean skipList = createPerf || magic || cCreate || cEtag; + if (skipList) { + probes.remove(StatusProbeEnum.List); + } + + // if probes are required -request them and evaluate the result. + if (!probes.isEmpty()) { try { - boolean overwrite = flags.contains(CreateFlag.OVERWRITE); // get the status or throw an FNFE. - // when overwriting, there is no need to look for any existing file, - // just a directory (for safety) - FileStatus status = innerGetFileStatus(path, false, - overwrite - ? StatusProbeEnum.DIRECTORIES - : StatusProbeEnum.ALL); + FileStatus status = innerGetFileStatus(path, false, probes); // if the thread reaches here, there is something at the path if (status.isDirectory()) { @@ -2109,6 +2168,10 @@ private FSDataOutputStream innerCreateFile( } catch (FileNotFoundException e) { // this means there is nothing at the path; all good. } + } else { + LOG.debug("Skipping all probes with flags:" + + " createPerf={}, magic={}, ccAvailable={}, cCreate={}, cEtag={}", + createPerf, magic, ccAvailable, cCreate, cEtag); } instrumentation.fileCreated(); final BlockOutputStreamStatistics outputStreamStatistics @@ -2117,37 +2180,45 @@ private FSDataOutputStream innerCreateFile( committerIntegration.createTracker(path, key, outputStreamStatistics); String destKey = putTracker.getDestKey(); + EnumSet<WriteObjectFlags> putFlags = options.writeObjectFlags(); + if (conditionalPut) { + putFlags.add(WriteObjectFlags.ConditionalOverwrite); + } + // put options are derived from the option builder. final PutObjectOptions putOptions = - new PutObjectOptions(null, options.getHeaders()); + new PutObjectOptions(null, + options.getHeaders(), + putFlags, + options.etag()); validateOutputStreamConfiguration(path, getConf()); final S3ABlockOutputStream.BlockOutputStreamBuilder builder = S3ABlockOutputStream.builder() - .withKey(destKey) - .withBlockFactory(blockFactory) - .withBlockSize(partSize) - .withStatistics(outputStreamStatistics) - .withProgress(progress) - .withPutTracker(putTracker) - .withWriteOperations( - createWriteOperationHelper(auditSpan)) - .withExecutorService( - new SemaphoredDelegatingExecutor( - boundedThreadPool, - blockOutputActiveBlocks, - true, - outputStreamStatistics)) - .withDowngradeSyncableExceptions( + .withKey(destKey) + .withBlockFactory(blockFactory) + .withBlockSize(partSize) + .withStatistics(outputStreamStatistics) + .withProgress(progress) + .withPutTracker(putTracker) + .withWriteOperations( + createWriteOperationHelper(auditSpan)) + .withExecutorService( + new SemaphoredDelegatingExecutor( + boundedThreadPool, + blockOutputActiveBlocks, + true, + outputStreamStatistics)) + .withDowngradeSyncableExceptions( getConf().getBoolean( DOWNGRADE_SYNCABLE_EXCEPTIONS, DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT)) - .withCSEEnabled(isCSEEnabled) - .withPutOptions(putOptions) - .withIOStatisticsAggregator( - IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartEnabled(isMultipartUploadEnabled); + .withCSEEnabled(isCSEEnabled) + .withPutOptions(putOptions) + .withIOStatisticsAggregator( + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) + .withMultipartEnabled(isMultipartUploadEnabled); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); @@ -3192,7 +3263,7 @@ private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest) public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, long length, boolean isDirectoryMarker) { - return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker); + return requestFactory.newPutObjectRequestBuilder(key, PutObjectOptions.defaultOptions(), length, isDirectoryMarker); } /** @@ -5301,6 +5372,7 @@ public CommitterStatistics newCommitterStatistics() { public boolean hasPathCapability(final Path path, final String capability) throws IOException { final Path p = makeQualified(path); + final S3AStore store = getStore(); String cap = validatePathCapabilityArgs(p, capability); switch (cap) { @@ -5362,11 +5434,20 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED: return isMultipartUploadEnabled(); - // create file options + // create file options which are always true + + case FS_OPTION_CREATE_IN_CLOSE: + case FS_OPTION_CREATE_CONTENT_TYPE: case FS_S3A_CREATE_PERFORMANCE: case FS_S3A_CREATE_HEADER: return true; + // conditional create requires it to be enabled in the FS. + case FS_S3A_CONDITIONAL_CREATE_ENABLED: + case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE: + case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG: + return conditionalCreateEnabled; + // is the FS configured for create file performance case FS_S3A_CREATE_PERFORMANCE_ENABLED: return performanceFlags.enabled(PerformanceFlagEnum.Create); @@ -5390,8 +5471,8 @@ public boolean hasPathCapability(final Path path, final String capability) } // ask the store for what capabilities it offers - // this may include input and output capabilites -and more - if (getStore() != null && getStore().hasPathCapability(path, capability)) { + // this includes, store configuration flags, IO capabilites...etc. + if (store.hasPathCapability(path, capability)) { return true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 1d26eb62750..b3c907428ac 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1530,7 +1530,9 @@ private OutputStreamStatistics( STREAM_WRITE_TOTAL_DATA.getSymbol(), STREAM_WRITE_TOTAL_TIME.getSymbol(), INVOCATION_HFLUSH.getSymbol(), - INVOCATION_HSYNC.getSymbol()) + INVOCATION_HSYNC.getSymbol(), + CONDITIONAL_CREATE.getSymbol(), + CONDITIONAL_CREATE_FAILED.getSymbol()) .withGauges( STREAM_WRITE_BLOCK_UPLOADS_ACTIVE.getSymbol(), STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(), @@ -1688,6 +1690,15 @@ public void hsyncInvoked() { incCounter(INVOCATION_HSYNC.getSymbol(), 1); } + @Override + public void conditionalCreateOutcome(boolean success) { + if (success) { + incCounter(CONDITIONAL_CREATE.getSymbol(), 1); + } else { + incCounter(CONDITIONAL_CREATE_FAILED.getSymbol(), 1); + } + } + @Override public void close() { if (getBytesPendingUpload() > 0) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index ffd3f5e1155..ee98693e696 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -108,6 +108,12 @@ public enum Statistic { "Filesystem close", TYPE_DURATION), + CONDITIONAL_CREATE(StoreStatisticNames.CONDITIONAL_CREATE, + "Count of successful conditional create operations.", + TYPE_COUNTER), + CONDITIONAL_CREATE_FAILED(StoreStatisticNames.CONDITIONAL_CREATE_FAILED, + "Count of failed conditional create operations.", + TYPE_COUNTER), DIRECTORIES_CREATED("directories_created", "Total number of directories created through the object store.", TYPE_COUNTER), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 969c1023d73..364f780863a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -318,8 +318,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload( retrying, () -> { final CompleteMultipartUploadRequest.Builder requestBuilder = - getRequestFactory().newCompleteMultipartUploadRequestBuilder( - destKey, uploadId, partETags); + getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions); return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build()); }); return uploadResult; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index c69e3394c3d..294f0a08f92 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -168,12 +168,14 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( * @param destKey destination object key * @param uploadId ID of initiated upload * @param partETags ordered list of etags + * @param putOptions options for the request * @return the request builder. */ CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List<CompletedPart> partETags); + List<CompletedPart> partETags, + PutObjectOptions putOptions); /** * Create a HEAD object request builder. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java index ecc3496ce8f..dafcbe45ac5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.commit.magic; import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; @@ -40,6 +42,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; +import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; /** @@ -79,7 +82,10 @@ public boolean aboutToComplete(String uploadId, PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( getOriginalDestKey(), 0, - new PutObjectOptions(null, headers)); + new PutObjectOptions(null, + headers, + EnumSet.noneOf(WriteObjectFlags.class), + "")); upload(originalDestPut, EMPTY); // build the commit summary @@ -103,7 +109,8 @@ public boolean aboutToComplete(String uploadId, getPath(), getPendingPartKey(), commitData); PutObjectRequest put = getWriter().createPutObjectRequest( getPendingPartKey(), - bytes.length, null); + bytes.length, + defaultOptions()); upload(put, bytes); return false; } @@ -117,7 +124,7 @@ public boolean aboutToComplete(String uploadId, @Retries.RetryTranslated private void upload(PutObjectRequest request, byte[] bytes) throws IOException { trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(), - () -> getWriter().putObject(request, PutObjectOptions.defaultOptions(), + () -> getWriter().putObject(request, defaultOptions(), new S3ADataBlocks.BlockUploadData(bytes, null), null)); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java index fe7b4514798..8a7dd3b173c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java @@ -38,6 +38,8 @@ public interface AWSHeaders { String DATE = "Date"; String ETAG = "ETag"; String LAST_MODIFIED = "Last-Modified"; + String IF_NONE_MATCH = "If-None-Match"; + String IF_MATCH = "If-Match"; /* * Amazon HTTP Headers used by S3A. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index ae2945989dd..210321b3cdd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.hadoop.conf.Configuration; @@ -33,11 +33,22 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; -import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags; import org.apache.hadoop.util.Progressable; +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CONTENT_TYPE; +import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwrite; +import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwriteEtag; +import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.CreateMultipart; +import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.Performance; +import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.Recursive; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CREATE_FILE_KEYS; +import static org.apache.hadoop.util.Preconditions.checkArgument; /** * Builder used in create file; takes a callback to the operation @@ -63,19 +74,25 @@ public class CreateFileBuilder extends * Classic create file option set: overwriting. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, + EnumSet.of(Recursive), + null, null); /** * Classic create file option set: no overwrite. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE = - new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, + EnumSet.of(Recursive), + null, null); /** * Performance create options. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, + EnumSet.of(Performance,Recursive), + null, null); /** * Callback interface. @@ -109,27 +126,36 @@ public FSDataOutputStream build() throws IOException { final Configuration options = getOptions(); final Map<String, String> headers = new HashMap<>(); final Set<String> mandatoryKeys = getMandatoryKeys(); - final Set<String> keysToValidate = new HashSet<>(); + final EnumSet<WriteObjectFlags> createFileSwitches = EnumSet.noneOf( + WriteObjectFlags.class); // pick up all headers from the mandatory list and strip them before // validating the keys + + // merge the config lists + String headerPrefix = FS_S3A_CREATE_HEADER + "."; final int prefixLen = headerPrefix.length(); - mandatoryKeys.stream().forEach(key -> { - if (key.startsWith(headerPrefix) && key.length() > prefixLen) { - headers.put(key.substring(prefixLen), options.get(key)); - } else { - keysToValidate.add(key); - } - }); + + final Set<String> keysToValidate = mandatoryKeys.stream() + .filter(key -> !key.startsWith(headerPrefix)) + .collect(Collectors.toSet()); rejectUnknownMandatoryKeys(keysToValidate, CREATE_FILE_KEYS, "for " + path); - // and add any optional headers - getOptionalKeys().stream() - .filter(key -> key.startsWith(headerPrefix) && key.length() > prefixLen) - .forEach(key -> headers.put(key.substring(prefixLen), options.get(key))); + // look for headers + for (Map.Entry<String, String> option : options) { + String key = option.getKey(); + if (key.startsWith(headerPrefix) && key.length() > prefixLen) { + headers.put(key.substring(prefixLen), option.getValue()); + } + } + + // and add the mimetype + if (options.get(FS_OPTION_CREATE_CONTENT_TYPE, null) != null) { + headers.put(CONTENT_TYPE, options.get(FS_OPTION_CREATE_CONTENT_TYPE, null)); + } EnumSet<CreateFlag> flags = getFlags(); if (flags.contains(CreateFlag.APPEND)) { @@ -142,13 +168,32 @@ public FSDataOutputStream build() throws IOException { "Must specify either create or overwrite"); } - final boolean performance = - options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false); + // build the other switches + if (isRecursive()) { + createFileSwitches.add(Recursive); + } + if (Performance.isEnabled(options)) { + createFileSwitches.add(Performance); + } + if (CreateMultipart.isEnabled(options)) { + createFileSwitches.add(CreateMultipart); + } + if (ConditionalOverwrite.isEnabled(options)) { + createFileSwitches.add(ConditionalOverwrite); + } + // etag is a string so is checked for then extracted. + final String etag = options.get(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, null); + if (etag != null) { + createFileSwitches.add(ConditionalOverwriteEtag); + } + return callbacks.createFileFromBuilder( path, getProgress(), - new CreateFileOptions(flags, isRecursive(), performance, headers)); - + new CreateFileOptions(flags, + createFileSwitches, + etag, + headers)); } /** @@ -209,14 +254,14 @@ public static final class CreateFileOptions { private final EnumSet<CreateFlag> flags; /** - * create parent dirs? + * Create File switches. */ - private final boolean recursive; + private final EnumSet<WriteObjectFlags> writeObjectFlags; /** - * performance flag. + * Etag. Only used if the create file switches enable it. */ - private final boolean performance; + private final String etag; /** * Headers; may be null. @@ -225,18 +270,22 @@ public static final class CreateFileOptions { /** * @param flags creation flags - * @param recursive create parent dirs? - * @param performance performance flag + * @param writeObjectFlags Create File switches. + * @param etag ETag, used only if enabled by switches * @param headers nullable header map. */ public CreateFileOptions( final EnumSet<CreateFlag> flags, - final boolean recursive, - final boolean performance, + final EnumSet<WriteObjectFlags> writeObjectFlags, + final String etag, final Map<String, String> headers) { - this.flags = flags; - this.recursive = recursive; - this.performance = performance; + this.flags = requireNonNull(flags); + this.writeObjectFlags = requireNonNull(writeObjectFlags); + if (writeObjectFlags().contains(ConditionalOverwriteEtag)) { + checkArgument(!isEmpty(etag), + "etag overwrite is enabled but the etag string is null/empty"); + } + this.etag = etag; this.headers = headers; } @@ -244,8 +293,7 @@ public CreateFileOptions( public String toString() { return "CreateFileOptions{" + "flags=" + flags + - ", recursive=" + recursive + - ", performance=" + performance + + ", writeObjectFlags=" + writeObjectFlags + ", headers=" + headers + '}'; } @@ -255,16 +303,36 @@ public EnumSet<CreateFlag> getFlags() { } public boolean isRecursive() { - return recursive; + return isSet(Recursive); } public boolean isPerformance() { - return performance; + return isSet(Performance); + } + + public boolean isConditionalOverwrite() { + return isSet(ConditionalOverwrite); + } + + public boolean isConditionalOverwriteEtag() { + return isSet(ConditionalOverwriteEtag); + } + + public boolean isSet(WriteObjectFlags val) { + return writeObjectFlags().contains(val); } public Map<String, String> getHeaders() { return headers; } + + public String etag() { + return etag; + } + + public EnumSet<WriteObjectFlags> writeObjectFlags() { + return writeObjectFlags; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 5bb64ddc289..8cf435f7ca6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -35,11 +35,16 @@ import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CHECKSUMS; import static org.apache.hadoop.fs.CommonPathCapabilities.FS_MULTIPART_UPLOADER; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; @@ -260,7 +265,14 @@ private InternalConstants() { */ public static final Set<String> CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE))); + new HashSet<>(Arrays.asList( + FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, + FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, + FS_OPTION_CREATE_IN_CLOSE, + FS_OPTION_CREATE_CONTENT_TYPE, + FS_S3A_CREATE_PERFORMANCE, + FS_S3A_CREATE_MULTIPART + ))); /** * Dynamic Path capabilities to be evaluated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java index 1ca502c44cd..c48a0e03c6b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java @@ -18,9 +18,17 @@ package org.apache.hadoop.fs.s3a.impl; +import java.util.EnumSet; import java.util.Map; import javax.annotation.Nullable; +import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags; + +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwrite; +import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwriteEtag; +import static org.apache.hadoop.util.Preconditions.checkArgument; + /** * Extensible structure for options when putting/writing objects. */ @@ -36,16 +44,71 @@ public final class PutObjectOptions { */ private final Map<String, String> headers; + /** + * Flags to control the write process. + */ + private final EnumSet<WriteObjectFlags> writeObjectFlags; + + /** + * If set, allows overwriting an object only if the object's ETag matches this value. + */ + private final String etagOverwrite; + /** * Constructor. * @param storageClass Storage class, if not null. * @param headers Headers; may be null. + * @param writeObjectFlags flags for writing + * @param etagOverwrite etag for etag writes. + * MUST not be empty if etag overwrite flag is set. */ public PutObjectOptions( @Nullable final String storageClass, - @Nullable final Map<String, String> headers) { + @Nullable final Map<String, String> headers, + final EnumSet<WriteObjectFlags> writeObjectFlags, + @Nullable final String etagOverwrite) { this.storageClass = storageClass; this.headers = headers; + this.writeObjectFlags = writeObjectFlags; + this.etagOverwrite = etagOverwrite; + if (isEtagOverwrite()) { + checkArgument(!isEmpty(etagOverwrite), + "etag overwrite is enabled but the etag string is null/empty"); + } + } + + /** + * Get the noObjectOverwrite flag. + * @return true if object override not allowed. + */ + public boolean isNoObjectOverwrite() { + return hasFlag(ConditionalOverwrite); + } + + /** + * Get the isEtagOverwrite flag. + * @return true if the write MUST overwrite an object with the + * supplied etag. + */ + public boolean isEtagOverwrite() { + return hasFlag(ConditionalOverwriteEtag); + } + + /** + * Does the flag set contain the specific flag. + * @param flag flag to look for + * @return true if the flag is set. + */ + public boolean hasFlag(WriteObjectFlags flag) { + return writeObjectFlags.contains(flag); + } + + /** + * Get the ETag that must match for an overwrite operation to proceed. + * @return The ETag required for overwrite, or {@code null} if no ETag match is required. + */ + public String getEtagOverwrite() { + return etagOverwrite; } /** @@ -56,10 +119,17 @@ public Map<String, String> getHeaders() { return headers; } + public EnumSet<WriteObjectFlags> getWriteObjectFlags() { + return writeObjectFlags; + } + @Override public String toString() { return "PutObjectOptions{" + - ", storageClass='" + storageClass + '\'' + + "storageClass='" + storageClass + '\'' + + ", headers=" + headers + + ", writeObjectFlags=" + writeObjectFlags + + ", etagOverwrite='" + etagOverwrite + '\'' + '}'; } @@ -67,9 +137,12 @@ public String toString() { * Empty options. */ private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions( - null, null); + null, + null, + EnumSet.noneOf(WriteObjectFlags.class), + null); - /** + /** * Get the default options. * @return an instance with no storage class or headers. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index f03b83764b8..6feca522cb6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -60,12 +60,17 @@ import org.apache.hadoop.fs.s3a.api.RequestFactory; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; +import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags; +import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; +import static org.apache.hadoop.fs.s3a.Constants.IF_NONE_MATCH_STAR; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_C; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_MATCH; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.util.Preconditions.checkNotNull; @@ -372,6 +377,19 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, setRequestTimeout(putObjectRequestBuilder, partUploadTimeout); } + if (options != null) { + if (options.isNoObjectOverwrite()) { + LOG.debug("setting If-None-Match"); + putObjectRequestBuilder.overrideConfiguration( + override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR)); + } + if (options.hasFlag(WriteObjectFlags.ConditionalOverwriteEtag)) { + LOG.debug("setting If-Match"); + putObjectRequestBuilder.overrideConfiguration( + override -> override.putHeader(IF_MATCH, options.getEtagOverwrite())); + } + } + return prepareRequest(putObjectRequestBuilder); } @@ -553,12 +571,26 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List<CompletedPart> partETags) { + List<CompletedPart> partETags, + PutObjectOptions putOptions) { + // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. - CompleteMultipartUploadRequest.Builder requestBuilder = - CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + CompleteMultipartUploadRequest.Builder requestBuilder; + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + + if (putOptions.isNoObjectOverwrite()) { + LOG.debug("setting If-None-Match"); + requestBuilder.overrideConfiguration( + override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR)); + } + if (!isEmpty(putOptions.getEtagOverwrite())) { + LOG.debug("setting if If-Match"); + requestBuilder.overrideConfiguration( + override -> override.putHeader(IF_MATCH, putOptions.getEtagOverwrite())); + } + // Correct SSE-C request parameters are required for this request when // specifying checksums for each part if (checksumAlgorithm != null && getServerSideEncryptionAlgorithm() == SSE_C) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java new file mode 100644 index 00000000000..5918316c810 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.write; + +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; + +/** + * Flags to use when creating/writing objects. + * The configuration key is used in two places: + * <ol> + * <li>Parsing builder options</li> + * <li>hasCapability() probes of the output stream.</li> + * </ol> + */ +public enum WriteObjectFlags { + ConditionalOverwrite(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE), + ConditionalOverwriteEtag(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG), + CreateMultipart(FS_S3A_CREATE_MULTIPART), + Performance(FS_S3A_CREATE_PERFORMANCE), + Recursive(""); + + /** Configuration key, or "" if not configurable. */ + private final String key; + + /** + * Constructor. + * @param key key configuration key, or "" if not configurable. + */ + WriteObjectFlags(final String key) { + this.key = key; + } + + /** + * does the configuration contain this option as a boolean? + * @param options options to scan + * @return true if this is defined as a boolean + */ + public boolean isEnabled(Configuration options) { + return options.getBoolean(key, false); + } + + /** + * Does the key of this option match the parameter? + * @param k key + * @return true if there is a match. + */ + public boolean hasKey(String k) { + return !key.isEmpty() && key.equals(k); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java new file mode 100644 index 00000000000..e0738424fba --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes related to writing objects. + */ +package org.apache.hadoop.fs.s3a.impl.write; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java index 6bf2354a83e..db1b7def474 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java @@ -146,4 +146,14 @@ public interface BlockOutputStreamStatistics extends Closeable, * Syncable.hsync() has been invoked. */ void hsyncInvoked(); + + /** + * Record the outcome of a conditional create operation. + * <p> + * This method increments the appropriate counter based on whether + * the conditional create operation was successful or failed. + * @param success {@code true} if the conditional create operation succeeded, + * {@code false} if it failed. + */ + void conditionalCreateOutcome(boolean success); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index a5d20095ba5..26b9f2b1568 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -549,6 +549,10 @@ public void hflushInvoked() { public void hsyncInvoked() { } + @Override + public void conditionalCreateOutcome(boolean success) { + } + @Override public void close() throws IOException { } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md index e4b5c76d20c..41fb9a04182 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md @@ -312,18 +312,20 @@ understands the risks. The configuration option `fs.s3a.create.performance` has the same behavior as the `fs.s3a.performance.flag` flag option `create`: -* No overwrite checks are made when creating a file, even if overwrite is set to `false` in the application/library code +* No overwrite checks are made when creating a file, even if overwrite is set to `false` in the application/library code. + Instead S3 conditional creation will be used to perform an atomic overwrite check _when the file write completes_. * No checks are made for an object being written above a path containing other objects (i.e. a "directory") * No checks are made for a parent path containing an object which is not a directory marker (i.e. a "file") This saves multiple probes per operation, especially a `LIST` call. -It may however result in -* Unintentional overwriting of data -* Creation of directory structures which can no longer be navigated through filesystem APIs. +It may however result in creation of directory structures which can no longer be navigated through filesystem APIs. Use with care, and, ideally, enable versioning on the S3 store. +Note that S3 Conditional creation may not be supported on third party stores, +in which case no overwrite checks are performed at all. + ### <a name="mkdir-performance"></a> Mkdir Performance diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md index feffdf0a8b0..f6fea9338a4 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md @@ -41,6 +41,7 @@ The features which may be unavailable include: * List API to use (`fs.s3a.list.version = 1`) * Bucket lifecycle rules to clean up pending uploads. * Support for multipart uploads. +* Conditional file creation. (`fs.s3a.create.conditional.enabled = false`) ### Disabling Change Detection @@ -64,7 +65,10 @@ path style access must also be enabled in `fs.s3a.path.style.access`. The v4 signing algorithm requires a region to be set in `fs.s3a.endpoint.region`. A non-empty value is generally sufficient, though some deployments may require -a specific value. +a specific value. + +*Important:* do not use `auto` or `sdk` as these may be used +in the future for specific region binding algorithms. Finally, assuming the credential source is the normal access/secret key then these must be set, either in XML or (preferred) in a JCEKS file. @@ -150,6 +154,26 @@ If there are any, they are aborted (sequentially). * If any other process is writing to the same directory tree, their operations will be cancelled. +#### Conditional File Creation. + +The S3A connector supports conditional file creation, in which applications specifically +written to use the `openFile()` API to create a file with will fail if there is a object +found at the time the actual write is committed -or only permit the write to succeed +if an object exists with a specified etag. + +These can both be used for S3-specific commit protocols -protocols which are unsafe +to use on stores without support for the conditional create feature. + +In such a situation, the option `fs.s3a.create.conditional.enabled` should be set to +false to disable use of these features. + +```xml + <property> + <name>fs.s3a.create.conditional.enabled</name> + <value>false</value> + </property> +``` + # Troubleshooting @@ -464,10 +488,18 @@ this makes renaming and deleting significantly slower. <name>fs.s3a.multipart.uploads.enabled</name> <value>false</value> </property> - <property> + + <property> <name>fs.s3a.optimized.copy.from.local.enabled</name> <value>false</value> </property> + + <!-- No support for conditional file creation --> + <property> + <name>fs.s3a.create.conditional.enabled</name> + <value>false</value> + </property> + </configuration> ``` @@ -497,6 +529,5 @@ It is also a way to regression test foundational S3A third-party store compatibi </configuration> ``` -_Note_ If anyone is set up to test this reguarly, please let the hadoop developer team know if regressions do surface, +_Note_ If anyone is set up to test this regularly, please let the hadoop developer team know if regressions do surface, as it is not a common test configuration. -[] \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java index 508e1a38356..396a9f60a05 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java @@ -332,7 +332,9 @@ public void testSizeOfEncryptedObjectFromHeaderWithV1Compatibility() throws Exce .build(); PutObjectRequest.Builder putObjectRequestBuilder = factory.newPutObjectRequestBuilder(key, - null, SMALL_FILE_SIZE, false); + PutObjectOptions.defaultOptions(), + SMALL_FILE_SIZE, + false); putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE))); putObjectRequestBuilder.metadata(metadata); fs.putObjectDirect(putObjectRequestBuilder.build(), diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index ecda6fd2ace..3b30a8e05dc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -107,7 +107,9 @@ public void testPutObjectDirect() throws Throwable { .build(); Path path = path("putDirect"); PutObjectRequest.Builder putObjectRequestBuilder = - factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false); + factory.newPutObjectRequestBuilder(path.toUri().getPath(), + PutObjectOptions.defaultOptions(), + -1, false); putObjectRequestBuilder.contentLength(-1L); LambdaTestUtils.intercept(IllegalStateException.class, () -> fs.putObjectDirect( diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 53e4a68cbb6..4d97ab2179b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -69,6 +69,7 @@ import org.apache.hadoop.util.functional.FutureIO; import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; import org.junit.Assert; import org.junit.Assume; import org.junit.AssumptionViolatedException; @@ -1166,6 +1167,14 @@ public static void assumeStoreAwsHosted(final FileSystem fs) { .getTrimmed(ENDPOINT, DEFAULT_ENDPOINT))); } + /** + * Skip if conditional creation is not enabled. + */ + public static void assumeConditionalCreateEnabled(Configuration conf) { + skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_CREATE_ENABLED, + "conditional create is disabled"); + } + /** * Modify the config by setting the performance flags and return the modified config. * @@ -1466,7 +1475,9 @@ public static void assume(String message, boolean condition) { if (!condition) { LOG.warn(message); } - Assume.assumeTrue(message, condition); + Assumptions.assumeThat(condition). + describedAs(message) + .isTrue(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java new file mode 100644 index 00000000000..8528e4779c2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.util.Arrays; +import java.util.Collection; + +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3ATestUtils; + +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assumptions.assumeThat; + +@RunWith(Parameterized.class) +public class ITestS3AConditionalCreateBehavior extends AbstractS3ATestBase { + + private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255); + + private final boolean conditionalCreateEnabled; + + public ITestS3AConditionalCreateBehavior(boolean conditionalCreateEnabled) { + this.conditionalCreateEnabled = conditionalCreateEnabled; + } + + @Parameterized.Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + /** + * Asserts that the FSDataOutputStream has the conditional create capability enabled. + * + * @param stream The output stream to check. + */ + private static void assertHasCapabilityConditionalCreate(FSDataOutputStream stream) { + Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE)) + .as("Conditional create capability should be enabled") + .isTrue(); + } + + /** + * Asserts that the FSDataOutputStream has the ETag-based conditional create capability enabled. + * + * @param stream The output stream to check. + */ + private static void assertHasCapabilityEtagWrite(FSDataOutputStream stream) { + Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG)) + .as("ETag-based conditional create capability should be enabled") + .isTrue(); + } + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides( + conf, + FS_S3A_CREATE_PERFORMANCE, + FS_S3A_PERFORMANCE_FLAGS, + MULTIPART_SIZE, + MIN_MULTIPART_THRESHOLD, + UPLOAD_PART_COUNT_LIMIT + ); + if (!conditionalCreateEnabled) { + conf.setBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED, false); + } + S3ATestUtils.disableFilesystemCaching(conf); + return conf; + } + + @Before + public void setUp() throws Exception { + super.setup(); + } + + @Test + public void testConditionalWrite() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create a file over an empty path + try (FSDataOutputStream stream = fs.create(testFile)) { + stream.write(SMALL_FILE_BYTES); + } + + // attempted conditional overwrite fails + intercept(PathIOException.class, () -> { + FSDataOutputStreamBuilder cf = fs.createFile(testFile); + cf.opt(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, true); + try (FSDataOutputStream stream = cf.build()) { + assertHasCapabilityConditionalCreate(stream); + stream.write(SMALL_FILE_BYTES); + } + }); + } + + @Test + public void testWriteWithEtag() throws Throwable { + assumeThat(conditionalCreateEnabled) + .as("Skipping as conditional create is enabled") + .isFalse(); + + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create a file over an empty path + try (FSDataOutputStream stream = fs.create(testFile)) { + stream.write(SMALL_FILE_BYTES); + } + + String etag = ((S3AFileStatus) fs.getFileStatus(testFile)).getEtag(); + Assertions.assertThat(etag) + .as("ETag should not be null after file creation") + .isNotNull(); + + // attempted write with etag. should fail + intercept(PathIOException.class, () -> { + FSDataOutputStreamBuilder cf = fs.createFile(testFile); + cf.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag); + try (FSDataOutputStream stream = cf.build()) { + assertHasCapabilityEtagWrite(stream); + stream.write(SMALL_FILE_BYTES); + } + }); + } + + @Test + public void testWriteWithPerformanceFlagAndOverwriteFalse() throws Throwable { + assumeThat(conditionalCreateEnabled) + .as("Skipping as conditional create is enabled") + .isFalse(); + + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create a file over an empty path + try (FSDataOutputStream stream = fs.create(testFile)) { + stream.write(SMALL_FILE_BYTES); + } + + // overwrite with performance flag + FSDataOutputStreamBuilder cf = fs.createFile(testFile); + cf.overwrite(false); + cf.must(FS_S3A_CREATE_PERFORMANCE, true); + IOStatistics ioStatistics; + try (FSDataOutputStream stream = cf.build()) { + stream.write(SMALL_FILE_BYTES); + ioStatistics = S3ATestUtils.getOutputStreamStatistics(stream).getIOStatistics(); + } + // TODO: uncomment when statistics are getting initialised + // verifyStatisticCounterValue(ioStatistics, Statistic.CONDITIONAL_CREATE.getSymbol(), 0); + // verifyStatisticCounterValue(ioStatistics, Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java new file mode 100644 index 00000000000..7939b39935b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java @@ -0,0 +1,660 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.Ignore; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.Statistic;; +import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; + +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeConditionalCreateEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assumptions.assumeThat; + +/** + * Integration tests with conditional overwrites. + * This test class verifies the behavior of "If-Match" and "If-None-Match" + * conditions while writing files. + */ +public class ITestS3APutIfMatchAndIfNoneMatch extends AbstractS3ATestBase { + + private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB; + + private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255); + private static final byte[] MULTIPART_FILE_BYTES = dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a'); + + private BlockOutputStreamStatistics statistics; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + + S3ATestUtils.disableFilesystemCaching(conf); + removeBaseAndBucketOverrides( + conf, + FS_S3A_CREATE_PERFORMANCE, + FS_S3A_PERFORMANCE_FLAGS, + MULTIPART_SIZE, + MIN_MULTIPART_THRESHOLD, + UPLOAD_PART_COUNT_LIMIT + ); + conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); + conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD); + conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + Configuration conf = getConfiguration(); + assumeConditionalCreateEnabled(conf); + } + + /** + * Asserts that an S3Exception has the expected HTTP status code. + * + * @param code Expected HTTP status code. + * @param ex Exception to validate. + */ + private static void assertS3ExceptionStatusCode(int code, Exception ex) { + S3Exception s3Exception = (S3Exception) ex.getCause(); + + if (s3Exception.statusCode() != code) { + throw new AssertionError("Expected status code " + code + " from " + ex, ex); + } + } + + /** + * Asserts that the FSDataOutputStream has the conditional create capability enabled. + * + * @param stream The output stream to check. + */ + private static void assertHasCapabilityConditionalCreate(FSDataOutputStream stream) { + Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE)) + .as("Conditional create capability should be enabled") + .isTrue(); + } + + /** + * Asserts that the FSDataOutputStream has the ETag-based conditional create capability enabled. + * + * @param stream The output stream to check. + */ + private static void assertHasCapabilityEtagWrite(FSDataOutputStream stream) { + Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG)) + .as("ETag-based conditional create capability should be enabled") + .isTrue(); + } + + protected String getBlockOutputBufferName() { + return FAST_UPLOAD_BUFFER_ARRAY; + } + + /** + * Creates a file with specified flags and writes data to it. + * + * @param fs The FileSystem instance. + * @param path Path of the file to create. + * @param data Byte data to write into the file. + * @param ifNoneMatchFlag If true, enforces conditional creation. + * @param etag The ETag for conditional writes. + * @param forceMultipart If true, forces multipart upload. + * @return The FileStatus of the created file. + * @throws Exception If an error occurs during file creation. + */ + private static FileStatus createFileWithFlags( + FileSystem fs, + Path path, + byte[] data, + boolean ifNoneMatchFlag, + String etag, + boolean forceMultipart) throws Exception { + try (FSDataOutputStream stream = getStreamWithFlags(fs, path, ifNoneMatchFlag, etag, + forceMultipart)) { + if (ifNoneMatchFlag) { + assertHasCapabilityConditionalCreate(stream); + } + if (etag != null) { + assertHasCapabilityEtagWrite(stream); + } + if (data != null && data.length > 0) { + stream.write(data); + } + } + return fs.getFileStatus(path); + } + + /** + * Overloaded method to create a file without forcing multipart upload. + * + * @param fs The FileSystem instance. + * @param path Path of the file to create. + * @param data Byte data to write into the file. + * @param ifNoneMatchFlag If true, enforces conditional creation. + * @param etag The ETag for conditional writes. + * @return The FileStatus of the created file. + * @throws Exception If an error occurs during file creation. + */ + private static FileStatus createFileWithFlags( + FileSystem fs, + Path path, + byte[] data, + boolean ifNoneMatchFlag, + String etag) throws Exception { + return createFileWithFlags(fs, path, data, ifNoneMatchFlag, etag, false); + } + + /** + * Opens a file for writing with specific conditional write flags. + * + * @param fs The FileSystem instance. + * @param path Path of the file to open. + * @param ifNoneMatchFlag If true, enables conditional overwrites. + * @param etag The ETag for conditional writes. + * @param forceMultipart If true, forces multipart upload. + * @return The FSDataOutputStream for writing. + * @throws Exception If an error occurs while opening the file. + */ + private static FSDataOutputStream getStreamWithFlags( + FileSystem fs, + Path path, + boolean ifNoneMatchFlag, + String etag, + boolean forceMultipart) throws Exception { + FSDataOutputStreamBuilder builder = fs.createFile(path); + if (ifNoneMatchFlag) { + builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, true); + } + if (etag != null) { + builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag); + } + if (forceMultipart) { + builder.opt(FS_S3A_CREATE_MULTIPART, true); + } + return builder.create().build(); + } + + /** + * Opens a file for writing with specific conditional write flags and without forcing multipart upload. + * + * @param fs The FileSystem instance. + * @param path Path of the file to open. + * @param ifNoneMatchFlag If true, enables conditional overwrites. + * @param etag The ETag for conditional writes. + * @return The FSDataOutputStream for writing. + * @throws Exception If an error occurs while opening the file. + */ + private static FSDataOutputStream getStreamWithFlags( + FileSystem fs, + Path path, + boolean ifNoneMatchFlag, + String etag) throws Exception { + return getStreamWithFlags(fs, path, ifNoneMatchFlag, etag, false); + } + + /** + * Reads the content of a file as a string. + * + * @param fs The FileSystem instance. + * @param path The file path to read. + * @return The content of the file as a string. + * @throws Throwable If an error occurs while reading the file. + */ + private static String readFileContent(FileSystem fs, Path path) throws Throwable { + try (FSDataInputStream inputStream = fs.open(path)) { + return IOUtils.toString(inputStream, StandardCharsets.UTF_8); + } + } + + /** + * Updates the statistics of the output stream. + * + * @param stream The FSDataOutputStream whose statistics should be updated. + */ + private void updateStatistics(FSDataOutputStream stream) { + statistics = S3ATestUtils.getOutputStreamStatistics(stream); + } + + /** + * Retrieves the ETag of a file. + * + * @param fs The FileSystem instance. + * @param path The path of the file. + * @return The ETag associated with the file. + * @throws IOException If an error occurs while fetching the file status. + */ + private static String getEtag(FileSystem fs, Path path) throws IOException { + String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag(); + return etag; + } + + @Test + public void testIfNoneMatchConflictOnOverwrite() throws Throwable { + describe("generate conflict on overwrites"); + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create a file over an empty path: all good + createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null); + + // attempted overwrite fails + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null)); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); + + // second attempt also fails + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null)); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); + + // Delete file and verify an overwrite works again + fs.delete(testFile, false); + createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null); + } + + @Test + public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + // Skip if multipart upload not supported + assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)) + .as("Skipping as multipart upload not supported") + .isTrue(); + + createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true); + + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true)); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); + + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true)); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); + } + + @Test + public void testIfNoneMatchMultipartUploadWithRaceCondition() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + // Skip test if multipart uploads are not supported + assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)) + .as("Skipping as multipart upload not supported") + .isTrue(); + + // Create a file with multipart upload but do not close the stream + FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, true); + assertHasCapabilityConditionalCreate(stream); + stream.write(MULTIPART_FILE_BYTES); + + // create and close another small file in parallel + createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null); + + // Closing the first stream should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testIfNoneMatchTwoConcurrentMultipartUploads() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + // Skip test if multipart uploads are not supported + assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)) + .as("Skipping as multipart upload not supported") + .isTrue(); + + // Create a file with multipart upload but do not close the stream + FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, true); + assertHasCapabilityConditionalCreate(stream); + stream.write(MULTIPART_FILE_BYTES); + + // create and close another multipart file in parallel + createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true); + + // Closing the first stream should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create a non-empty file + createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null); + + // overwrite with zero-byte file (no write) + FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null); + assertHasCapabilityConditionalCreate(stream); + + // close the stream, should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create an empty file (no write) + FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null); + assertHasCapabilityConditionalCreate(stream); + stream.close(); + + // overwrite with non-empty file, should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, + () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null)); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create an empty file (no write) + FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, true, null); + assertHasCapabilityConditionalCreate(stream1); + stream1.close(); + + // overwrite with another empty file, should throw RemoteFileChangedException + FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, null); + assertHasCapabilityConditionalCreate(stream2); + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testIfMatchOverwriteWithCorrectEtag() throws Throwable { + FileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(path.getParent()); + + // Create a file + createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null); + + // Retrieve the etag from the created file + String etag = getEtag(fs, path); + Assertions.assertThat(etag) + .as("ETag should not be null after file creation") + .isNotNull(); + + String updatedFileContent = "Updated content"; + byte[] updatedData = updatedFileContent.getBytes(StandardCharsets.UTF_8); + + // overwrite file with etag + createFileWithFlags(fs, path, updatedData, false, etag); + + // read file and verify overwritten content + String fileContent = readFileContent(fs, path); + Assertions.assertThat(fileContent) + .as("File content should be correctly updated after overwriting with the correct ETag") + .isEqualTo(updatedFileContent); + } + + @Test + public void testIfMatchOverwriteWithOutdatedEtag() throws Throwable { + FileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(path.getParent()); + + // Create a file + createFileWithFlags(fs, path, SMALL_FILE_BYTES, true, null); + + // Retrieve the etag from the created file + String etag = getEtag(fs, path); + Assertions.assertThat(etag) + .as("ETag should not be null after file creation") + .isNotNull(); + + // Overwrite the file. Will update the etag, making the previously fetched etag outdated. + createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null); + + // overwrite file with outdated etag. Should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, + () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag)); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testIfMatchOverwriteDeletedFileWithEtag() throws Throwable { + FileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(path.getParent()); + + // Create a file + createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null); + + // Retrieve the etag from the created file + String etag = getEtag(fs, path); + Assertions.assertThat(etag) + .as("ETag should not be null after file creation") + .isNotNull(); + + // delete the file + fs.delete(path); + + // overwrite file with etag. Should throw FileNotFoundException + FileNotFoundException exception = intercept(FileNotFoundException.class, + () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag)); + assertS3ExceptionStatusCode(SC_404_NOT_FOUND, exception); + } + + @Test + public void testIfMatchOverwriteFileWithEmptyEtag() throws Throwable { + FileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(path.getParent()); + + // Create a file + createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null); + + // overwrite file with empty etag. Should throw IllegalArgumentException + intercept(IllegalArgumentException.class, + () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, "")); + } + + @Test + public void testIfMatchTwoMultipartUploadsRaceConditionOneClosesFirst() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + // Skip test if multipart uploads are not supported + assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)) + .as("Skipping as multipart upload not supported") + .isTrue(); + + // Create a file and retrieve its etag + createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null); + String etag = getEtag(fs, testFile); + Assertions.assertThat(etag) + .as("ETag should not be null after file creation") + .isNotNull(); + + // Start two multipart uploads with the same etag + FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, false, etag, true); + assertHasCapabilityEtagWrite(stream1); + + FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, false, etag, true); + assertHasCapabilityEtagWrite(stream2); + + // Write data to both streams + stream1.write(MULTIPART_FILE_BYTES); + stream2.write(MULTIPART_FILE_BYTES); + + // Close the first stream successfully. Will update the etag + stream1.close(); + + // Close second stream, should fail due to etag mismatch + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Ignore("conditional_write statistics not yet fully implemented") + @Test + public void testConditionalWriteStatisticsWithoutIfNoneMatch() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + // write without an If-None-Match + // conditional_write, conditional_write_statistics should remain 0 + FSDataOutputStream stream = getStreamWithFlags(fs, testFile, false, null, false); + updateStatistics(stream); + stream.write(SMALL_FILE_BYTES); + stream.close(); + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0); + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0); + + // write with overwrite = true + // conditional_write, conditional_write_statistics should remain 0 + try (FSDataOutputStream outputStream = fs.create(testFile, true)) { + outputStream.write(SMALL_FILE_BYTES); + updateStatistics(outputStream); + } + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0); + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0); + + // write in path where file already exists with overwrite = false + // conditional_write, conditional_write_statistics should remain 0 + try (FSDataOutputStream outputStream = fs.create(testFile, false)) { + outputStream.write(SMALL_FILE_BYTES); + updateStatistics(outputStream); + } catch (FileAlreadyExistsException e) {} + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0); + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0); + + // delete the file + fs.delete(testFile, false); + + // write in path where file doesn't exist with overwrite = false + // conditional_write, conditional_write_statistics should remain 0 + try (FSDataOutputStream outputStream = fs.create(testFile, false)) { + outputStream.write(SMALL_FILE_BYTES); + updateStatistics(outputStream); + } + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0); + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0); + } + + @Ignore("conditional_write statistics not yet fully implemented") + @Test + public void testConditionalWriteStatisticsWithIfNoneMatch() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, false); + updateStatistics(stream); + stream.write(SMALL_FILE_BYTES); + stream.close(); + + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 1); + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0); + + intercept(RemoteFileChangedException.class, + () -> { + // try again with If-None-Match. should fail + FSDataOutputStream s = getStreamWithFlags(fs, testFile, true, null, false); + updateStatistics(s); + s.write(SMALL_FILE_BYTES); + s.close(); + return "Second write using If-None-Match should have failed due to existing file." + s; + } + ); + + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 1); + verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 1); + } + + /** + * Tests that a conditional create operation is triggered when the performance flag is enabled + * and the overwrite option is set to false. + */ + @Test + public void testConditionalCreateWhenPerformanceFlagEnabledAndOverwriteDisabled() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // Create a file + createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null); + + // Attempt to override the file without overwrite and performance flag. + // Should throw RemoteFileChangedException (due to conditional write operation) + intercept(RemoteFileChangedException.class, () -> { + FSDataOutputStreamBuilder cf = fs.createFile(testFile); + cf.overwrite(false); + cf.must(FS_S3A_CREATE_PERFORMANCE, true); + try (FSDataOutputStream stream = cf.build()) { + assertHasCapabilityConditionalCreate(stream); + stream.write(SMALL_FILE_BYTES); + updateStatistics(stream); + } + }); + + // TODO: uncomment when statistics are getting initialised + // verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0); + // verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 1); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index 65d7aa6192d..c9682abab3e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -37,6 +37,7 @@ import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; /** * Unit test of {@link CreateFileBuilder}. @@ -89,11 +90,13 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") + .must(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map<String, String> headers = build(builder).getHeaders(); Assertions.assertThat(headers) .containsEntry("retention", "permanent") - .containsEntry("owner", "engineering"); + .containsEntry("owner", "engineering") + .containsEntry(IF_NONE_MATCH, "*"); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 58e2816eba7..36a0b8102b6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -22,6 +22,8 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; import java.util.Base64; import java.util.concurrent.atomic.AtomicLong; @@ -50,9 +52,11 @@ import org.apache.hadoop.fs.s3a.api.RequestFactory; import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; +import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; +import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.assertj.core.api.Assertions.assertThat; @@ -104,7 +108,8 @@ public void testRequestFactoryWithCannedACL() throws Throwable { String path2 = "path2"; HeadObjectResponse md = HeadObjectResponse.builder().contentLength(128L).build(); - Assertions.assertThat(factory.newPutObjectRequestBuilder(path, null, 128, false) + Assertions.assertThat(factory.newPutObjectRequestBuilder(path, + defaultOptions(), 128, false) .build() .acl() .toString()) @@ -175,7 +180,10 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { String id = "1"; a(factory.newAbortMultipartUploadRequestBuilder(path, id)); a(factory.newCompleteMultipartUploadRequestBuilder(path, id, - new ArrayList<>())); + new ArrayList<>(), new PutObjectOptions("some class", + Collections.emptyMap(), + EnumSet.noneOf(WriteObjectFlags.class), + ""))); a(factory.newCopyObjectRequestBuilder(path, path2, HeadObjectResponse.builder().build())); a(factory.newDeleteObjectRequestBuilder(path)); @@ -188,7 +196,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { a(factory.newListObjectsV2RequestBuilder(path, "/", 1)); a(factory.newMultipartUploadRequestBuilder(path, null)); a(factory.newPutObjectRequestBuilder(path, - PutObjectOptions.defaultOptions(), -1, true)); + defaultOptions(), -1, true)); } /** @@ -272,7 +280,7 @@ public void testUploadTimeouts() throws Throwable { // A simple PUT final PutObjectRequest put = factory.newPutObjectRequestBuilder(path, - PutObjectOptions.defaultOptions(), 1024, false).build(); + defaultOptions(), 1024, false).build(); assertApiTimeouts(partDuration, put); // multipart part @@ -329,8 +337,14 @@ public void testCompleteMultipartUploadRequestWithChecksumAlgorithmAndSSEC() thr .build(); createFactoryObjects(factory); + PutObjectOptions putObjectOptions = new PutObjectOptions( + null, + null, + EnumSet.noneOf(WriteObjectFlags.class), + null); + final CompleteMultipartUploadRequest request = - factory.newCompleteMultipartUploadRequestBuilder("path", "1", new ArrayList<>()) + factory.newCompleteMultipartUploadRequestBuilder("path", "1", new ArrayList<>(), putObjectOptions) .build(); Assertions.assertThat(request.sseCustomerAlgorithm()) .isEqualTo(ServerSideEncryption.AES256.name()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java index 4379c246680..c1dda84d7e0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; @@ -55,6 +56,7 @@ import static org.apache.hadoop.fs.s3a.performance.OperationCost.HEAD_OPERATION; import static org.apache.hadoop.fs.s3a.performance.OperationCost.LIST_OPERATION; import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Assert cost of createFile operations, especially @@ -199,7 +201,9 @@ public void testCreateBuilderSequence() throws Throwable { () -> buildFile(testFile, false, true, GET_FILE_STATUS_ON_FILE)); } else { - buildFile(testFile, false, true, NO_HEAD_OR_LIST); + // will trigger conditional create and throw RemoteFileChangedException + intercept(RemoteFileChangedException.class, + () -> buildFile(testFile, false, true, NO_HEAD_OR_LIST)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java index 8addbbe3049..e1a01eee214 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.api.RequestFactory; -import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.util.functional.RemoteIterators; @@ -58,6 +57,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; +import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; @@ -250,10 +250,10 @@ public void testMultiPagesListingPerformanceAndCorrectness() originalListOfFiles.add(file.toString()); PutObjectRequest.Builder putObjectRequestBuilder = requestFactory .newPutObjectRequestBuilder(fs.pathToKey(file), - null, 0, false); + defaultOptions(), 0, false); futures.add(submit(executorService, () -> writeOperationHelper.putObject(putObjectRequestBuilder.build(), - PutObjectOptions.defaultOptions(), + defaultOptions(), new S3ADataBlocks.BlockUploadData(new byte[0], null), null))); } LOG.info("Waiting for PUTs to complete"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org