[ 
https://issues.apache.org/jira/browse/HADOOP-19256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924188#comment-17924188
 ] 

ASF GitHub Bot commented on HADOOP-19256:
-----------------------------------------

steveloughran commented on code in PR #7329:
URL: https://github.com/apache/hadoop/pull/7329#discussion_r1932400430


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private Configuration conf;
+
+    @Override
+    public Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+        S3ATestUtils.disableFilesystemCaching(conf);
+        removeBaseAndBucketOverrides(
+                conf,
+                MULTIPART_SIZE,
+                UPLOAD_PART_COUNT_LIMIT,
+                MIN_MULTIPART_THRESHOLD);
+        conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+        conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+        conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+        return conf;
+    }
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        conf = createConfiguration();

Review Comment:
   not needed as `super.setup()` creates the config. Instead use
   ```
   Configuration conf = getConfiguration();
   ```
   



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -101,24 +99,26 @@ private static void createFileWithIfNoneMatchFlag(
             Path path,
             byte[] data,
             String ifMatchTag) throws Exception {
-          FSDataOutputStreamBuilder builder = fs.createFile(path);
-          builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag);
-          FSDataOutputStream stream = builder.create().build();
-          if (data != null && data.length > 0) {
-              stream.write(data);
-          }
-          stream.close();
-          IOUtils.closeStream(stream);
+        FSDataOutputStreamBuilder builder = fs.createFile(path);
+        builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true");
+        builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag);
+        FSDataOutputStream stream = builder.create().build();
+        if (data != null && data.length > 0) {
+            stream.write(data);
+        }
+        stream.close();
+        IOUtils.closeStream(stream);
     }
 
     @Test
     public void testPutIfAbsentConflict() throws Throwable {
         FileSystem fs = getFileSystem();
         Path testFile = methodPath();
-
         fs.mkdirs(testFile.getParent());
         byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);
 
+        createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+
         RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
                 () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, 
"*"));
         assertS3ExceptionStatusCode(412, firstException);

Review Comment:
   we've got a constant in org.apache.hadoop.fs.s3a.impl.InternalConstants for 
this number



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private Configuration conf;

Review Comment:
   cut



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;

Review Comment:
   import ordering is wrong...i'll give you the IDE settings for this



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java:
##########
@@ -62,6 +62,7 @@
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;

Review Comment:
   move below line 67



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -101,24 +99,26 @@ private static void createFileWithIfNoneMatchFlag(
             Path path,
             byte[] data,
             String ifMatchTag) throws Exception {
-          FSDataOutputStreamBuilder builder = fs.createFile(path);
-          builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag);
-          FSDataOutputStream stream = builder.create().build();
-          if (data != null && data.length > 0) {
-              stream.write(data);
-          }
-          stream.close();
-          IOUtils.closeStream(stream);
+        FSDataOutputStreamBuilder builder = fs.createFile(path);
+        builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true");
+        builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag);
+        FSDataOutputStream stream = builder.create().build();

Review Comment:
   use try-with-resources here.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java:
##########
@@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId,
     PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
         getOriginalDestKey(),
         0,
-        new PutObjectOptions(null, headers));
+        new PutObjectOptions(true,false, null, headers));

Review Comment:
   style nit: put a space after the comma. checkstyle will complain about this



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java:
##########
@@ -26,6 +26,16 @@
  */
 public final class PutObjectOptions {
 
+  /**
+   * Can the PUT operation skip marker deletion?
+   */
+  private final boolean keepMarkers;
+
+  /**
+   * Is this a conditional PUT operation
+   */
+  private final boolean conditionalPutEnabled;

Review Comment:
   given there's now a etag match too, maybe we should give this a name, such as
   
   `noObjectOverwrite`
   
   we can then add a new one after `etagOverwrite` which will be a string
   



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java:
##########
@@ -533,12 +534,19 @@ public CreateMultipartUploadRequest.Builder 
newMultipartUploadRequestBuilder(
   public CompleteMultipartUploadRequest.Builder 
newCompleteMultipartUploadRequestBuilder(
       String destKey,
       String uploadId,
-      List<CompletedPart> partETags) {
+      List<CompletedPart> partETags,
+      PutObjectOptions putOptions) {
+
     // a copy of the list is required, so that the AWS SDK doesn't
     // attempt to sort an unmodifiable list.
-    CompleteMultipartUploadRequest.Builder requestBuilder =
-        
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
+    CompleteMultipartUploadRequest.Builder requestBuilder;
+    Map<String, String> optionHeaders = putOptions.getHeaders();
+    requestBuilder = 
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
             
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
+    if (putOptions.isconditionalPutEnabled()) {
+      requestBuilder.overrideConfiguration(

Review Comment:
   this is from the first patch, isn't it? I'll have to look at my comments 
there



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java:
##########
@@ -26,6 +26,16 @@
  */
 public final class PutObjectOptions {
 
+  /**
+   * Can the PUT operation skip marker deletion?
+   */
+  private final boolean keepMarkers;
+
+  /**
+   * Is this a conditional PUT operation

Review Comment:
   add a ? at the end



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private Configuration conf;
+
+    @Override
+    public Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+        S3ATestUtils.disableFilesystemCaching(conf);
+        removeBaseAndBucketOverrides(
+                conf,
+                MULTIPART_SIZE,
+                UPLOAD_PART_COUNT_LIMIT,
+                MIN_MULTIPART_THRESHOLD);
+        conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+        conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+        conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+        return conf;
+    }
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        conf = createConfiguration();
+        skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE,
+                "Skipping IfNoneMatch tests");
+    }
+
+    private static void assertS3ExceptionStatusCode(int code, Exception ex) {
+        S3Exception s3Exception = (S3Exception) ex.getCause();
+
+        if (s3Exception.statusCode() != code) {
+            throw new AssertionError("Expected status code " + code + " from " 
+ ex, ex);
+        }
+    }
+
+    protected String getBlockOutputBufferName() {
+        return FAST_UPLOAD_BUFFER_ARRAY;
+    }
+
+    /**
+     * Create a file using the PutIfMatch feature from S3
+     * @param fs filesystem
+     * @param path       path to write
+     * @param data source dataset. Can be null
+     * @throws IOException on any problem
+     */
+    private static void createFileWithIfNoneMatchFlag(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            String ifMatchTag) throws Exception {
+        FSDataOutputStreamBuilder builder = fs.createFile(path);
+        builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true");
+        builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag);
+        FSDataOutputStream stream = builder.create().build();
+        if (data != null && data.length > 0) {
+            stream.write(data);
+        }
+        stream.close();
+        IOUtils.closeStream(stream);
+    }
+
+    @Test
+    public void testPutIfAbsentConflict() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+        byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+
+        createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+
+        RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, 
"*"));
+        assertS3ExceptionStatusCode(412, firstException);
+
+        RemoteFileChangedException secondException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, 
"*"));
+        assertS3ExceptionStatusCode(412, secondException);
+    }
+
+
+    @Test
+    public void testPutIfAbsentLargeFileConflict() throws Throwable {

Review Comment:
   I'd offered a way to test this without needing to create a large file for a 
multipart upload...we can work on this...anything uploading 6 MB gets moved 
into the scale tests, so doesn't get run as often



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java:
##########
@@ -39,15 +49,36 @@ public final class PutObjectOptions {
   /**
    * Constructor.
    * @param storageClass Storage class, if not null.
+   * @param conditionalPutEnabled Is this a conditional Put?
    * @param headers Headers; may be null.
    */
   public PutObjectOptions(
+      final boolean keepMarkers,

Review Comment:
   the keepmarkers option is something I've cut from trunk. It's come in here 
during rebase merge work.
   
   proposed;
   * you move the new argument to after the `headers` entry
   * remove the keepmarkers argument/field and all uses of it in this class.





> S3A: Support S3 Conditional Writes
> ----------------------------------
>
>                 Key: HADOOP-19256
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19256
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>
> S3 Conditional Write (Put-if-absent) capability is now generally available - 
> [https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/]
>  
> S3A should allow passing in this put-if-absent header to prevent over writing 
> of files. 
> There is a feature branch for this: HADOOP-19256-s3-conditional-writes
> + support etags to allow an overwrite to be restricted to overwriting a 
> specific version. This can be done through a createFile option.
> https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html
> Fun fact; third party stores will not reject overwrites if they don't 
> recognise the headers, so there's no way to be sure they are supported 
> without testing.
> we need a flag to enable/disable conditional writes which can be exposed in a 
> hasPathCapability()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to