[
https://issues.apache.org/jira/browse/HADOOP-19256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883393#comment-17883393
]
ASF GitHub Bot commented on HADOOP-19256:
-----------------------------------------
steveloughran commented on code in PR #7011:
URL: https://github.com/apache/hadoop/pull/7011#discussion_r1769075759
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,113 @@
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static
org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ATestBase {
Review Comment:
should subclass AbstractS3ACostTest so measure not HEAD was not issued in
createFile, at least we remove that check in this PR. It might be simplest to
leave it in right now and remove it in the create() follow up.
It is still good too change the superclass to allow for cost assertions
later.
Override setup to check the if-none-match flag to skip on third party store
tests; use skipIfNotEnabled()
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,113 @@
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static
org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ATestBase {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ removeBaseAndBucketOverrides(conf,
+ MULTIPART_SIZE,
+ UPLOAD_PART_COUNT_LIMIT);
+ conf.setLong(MULTIPART_SIZE, MPU_SIZE);
+ conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+ conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+ conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
+ return conf;
+ }
+
+ protected String getBlockOutputBufferName() {
+ return FAST_UPLOAD_BUFFER_ARRAY;
+ }
+
+ /**
+ * Create a file using the PutIfMatch feature from S3
+ * @param fs filesystem
+ * @param path path to write
+ * @param data source dataset. Can be null
+ * @throws IOException on any problem
+ */
+ private static void createFileWithIfNoneMatchFlag(FileSystem fs,
+ Path path,
+ byte[] data,
+ String ifMatchTag)
throws Exception {
+ FSDataOutputStreamBuilder builder = fs.createFile(path);
+ builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag);
+ FSDataOutputStream stream = builder.create().build();
+ if (data != null && data.length > 0) {
+ stream.write(data);
+ }
+ stream.close();
+ IOUtils.closeStream(stream);
+ }
+
+ @Test
+ public void testPutIfAbsentConflict() throws IOException {
+ FileSystem fs = getFileSystem();
+ Path testFile = methodPath();
+
+ fs.mkdirs(testFile.getParent());
+ byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+
+ try {
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+ } catch (Exception e) {
+ Assert.assertEquals(RemoteFileChangedException.class, e.getClass());
+
+ S3Exception s3Exception = (S3Exception) e.getCause();
+ Assert.assertEquals(s3Exception.statusCode(), 412);
Review Comment:
we've moved to AssertJ now in new tests.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,113 @@
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static
org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ATestBase {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ removeBaseAndBucketOverrides(conf,
+ MULTIPART_SIZE,
+ UPLOAD_PART_COUNT_LIMIT);
+ conf.setLong(MULTIPART_SIZE, MPU_SIZE);
+ conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+ conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+ conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
+ return conf;
+ }
+
+ protected String getBlockOutputBufferName() {
+ return FAST_UPLOAD_BUFFER_ARRAY;
+ }
+
+ /**
+ * Create a file using the PutIfMatch feature from S3
+ * @param fs filesystem
+ * @param path path to write
+ * @param data source dataset. Can be null
+ * @throws IOException on any problem
+ */
+ private static void createFileWithIfNoneMatchFlag(FileSystem fs,
+ Path path,
+ byte[] data,
+ String ifMatchTag)
throws Exception {
+ FSDataOutputStreamBuilder builder = fs.createFile(path);
+ builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag);
+ FSDataOutputStream stream = builder.create().build();
+ if (data != null && data.length > 0) {
+ stream.write(data);
+ }
+ stream.close();
+ IOUtils.closeStream(stream);
+ }
+
+ @Test
+ public void testPutIfAbsentConflict() throws IOException {
+ FileSystem fs = getFileSystem();
+ Path testFile = methodPath();
+
+ fs.mkdirs(testFile.getParent());
+ byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+
+ try {
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+ } catch (Exception e) {
+ Assert.assertEquals(RemoteFileChangedException.class, e.getClass());
+
+ S3Exception s3Exception = (S3Exception) e.getCause();
+ Assert.assertEquals(s3Exception.statusCode(), 412);
+ }
+ }
+
+
+ @Test
+ public void testPutIfAbsentLargeFileConflict() throws IOException {
+ FileSystem fs = getFileSystem();
+ Path testFile = methodPath();
+
+ fs.mkdirs(testFile.getParent());
Review Comment:
not neede
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java:
##########
@@ -517,12 +518,22 @@ public CreateMultipartUploadRequest.Builder
newMultipartUploadRequestBuilder(
public CompleteMultipartUploadRequest.Builder
newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
- List<CompletedPart> partETags) {
+ List<CompletedPart> partETags,
+ PutObjectOptions putOptions) {
+
// a copy of the list is required, so that the AWS SDK doesn't
// attempt to sort an unmodifiable list.
- CompleteMultipartUploadRequest.Builder requestBuilder =
-
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
+ CompleteMultipartUploadRequest.Builder requestBuilder;
+ Map<String, String> optionHeaders = putOptions.getHeaders();
Review Comment:
Add a boolean flag to put options, so that it can be enabled in different
bits of the code as appropriate...matters for future features
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,113 @@
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static
org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ATestBase {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ removeBaseAndBucketOverrides(conf,
+ MULTIPART_SIZE,
+ UPLOAD_PART_COUNT_LIMIT);
+ conf.setLong(MULTIPART_SIZE, MPU_SIZE);
+ conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+ conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+ conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
+ return conf;
+ }
+
+ protected String getBlockOutputBufferName() {
+ return FAST_UPLOAD_BUFFER_ARRAY;
+ }
+
+ /**
+ * Create a file using the PutIfMatch feature from S3
+ * @param fs filesystem
+ * @param path path to write
+ * @param data source dataset. Can be null
+ * @throws IOException on any problem
+ */
+ private static void createFileWithIfNoneMatchFlag(FileSystem fs,
+ Path path,
+ byte[] data,
+ String ifMatchTag)
throws Exception {
+ FSDataOutputStreamBuilder builder = fs.createFile(path);
+ builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag);
+ FSDataOutputStream stream = builder.create().build();
+ if (data != null && data.length > 0) {
+ stream.write(data);
+ }
+ stream.close();
+ IOUtils.closeStream(stream);
+ }
+
+ @Test
+ public void testPutIfAbsentConflict() throws IOException {
+ FileSystem fs = getFileSystem();
+ Path testFile = methodPath();
+
+ fs.mkdirs(testFile.getParent());
+ byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+
+ try {
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+ } catch (Exception e) {
+ Assert.assertEquals(RemoteFileChangedException.class, e.getClass());
+
+ S3Exception s3Exception = (S3Exception) e.getCause();
Review Comment:
I was gong to point you at `GenericTestUtils.assertExceptionContains()` but
it doesn't validate the cause type or return the value as a cast type the way
`intercept()` does. Time for a new assert there.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java:
##########
@@ -517,12 +518,22 @@ public CreateMultipartUploadRequest.Builder
newMultipartUploadRequestBuilder(
public CompleteMultipartUploadRequest.Builder
newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
- List<CompletedPart> partETags) {
+ List<CompletedPart> partETags,
+ PutObjectOptions putOptions) {
+
// a copy of the list is required, so that the AWS SDK doesn't
// attempt to sort an unmodifiable list.
- CompleteMultipartUploadRequest.Builder requestBuilder =
-
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
+ CompleteMultipartUploadRequest.Builder requestBuilder;
+ Map<String, String> optionHeaders = putOptions.getHeaders();
+
+ if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) {
+ requestBuilder =
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
+ .overrideConfiguration(override
->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match")))
Review Comment:
move to AWSHeaders and use everywhere
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java:
##########
@@ -224,10 +224,11 @@ public static final class CreateFileOptions {
private final Map<String, String> headers;
/**
- * @param flags creation flags
- * @param recursive create parent dirs?
+ * @param flags creation flags
Review Comment:
revert
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java:
##########
@@ -224,10 +224,11 @@ public static final class CreateFileOptions {
private final Map<String, String> headers;
/**
- * @param flags creation flags
- * @param recursive create parent dirs?
+ * @param flags creation flags
+ * @param recursive create parent dirs?
* @param performance performance flag
- * @param headers nullable header map.
+ * @param
Review Comment:
cut
> S3A: Support S3 Conditional Writes
> ----------------------------------
>
> Key: HADOOP-19256
> URL: https://issues.apache.org/jira/browse/HADOOP-19256
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3 Conditional Write (Put-if-absent) capability is now generally available -
> [https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/]
>
> S3A should allow passing in this put-if-absent header to prevent over writing
> of files.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]