[
https://issues.apache.org/jira/browse/HADOOP-19256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924188#comment-17924188
]
ASF GitHub Bot commented on HADOOP-19256:
-----------------------------------------
steveloughran commented on code in PR #7329:
URL: https://github.com/apache/hadoop/pull/7329#discussion_r1932400430
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+ private Configuration conf;
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ removeBaseAndBucketOverrides(
+ conf,
+ MULTIPART_SIZE,
+ UPLOAD_PART_COUNT_LIMIT,
+ MIN_MULTIPART_THRESHOLD);
+ conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+ conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+ conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ return conf;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ conf = createConfiguration();
Review Comment:
not needed as `super.setup()` creates the config. Instead use
```
Configuration conf = getConfiguration();
```
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -101,24 +99,26 @@ private static void createFileWithIfNoneMatchFlag(
Path path,
byte[] data,
String ifMatchTag) throws Exception {
- FSDataOutputStreamBuilder builder = fs.createFile(path);
- builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag);
- FSDataOutputStream stream = builder.create().build();
- if (data != null && data.length > 0) {
- stream.write(data);
- }
- stream.close();
- IOUtils.closeStream(stream);
+ FSDataOutputStreamBuilder builder = fs.createFile(path);
+ builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true");
+ builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag);
+ FSDataOutputStream stream = builder.create().build();
+ if (data != null && data.length > 0) {
+ stream.write(data);
+ }
+ stream.close();
+ IOUtils.closeStream(stream);
}
@Test
public void testPutIfAbsentConflict() throws Throwable {
FileSystem fs = getFileSystem();
Path testFile = methodPath();
-
fs.mkdirs(testFile.getParent());
byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+
RemoteFileChangedException firstException =
intercept(RemoteFileChangedException.class,
() -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes,
"*"));
assertS3ExceptionStatusCode(412, firstException);
Review Comment:
we've got a constant in org.apache.hadoop.fs.s3a.impl.InternalConstants for
this number
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+ private Configuration conf;
Review Comment:
cut
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
Review Comment:
import ordering is wrong...i'll give you the IDE settings for this
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java:
##########
@@ -62,6 +62,7 @@
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
Review Comment:
move below line 67
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -101,24 +99,26 @@ private static void createFileWithIfNoneMatchFlag(
Path path,
byte[] data,
String ifMatchTag) throws Exception {
- FSDataOutputStreamBuilder builder = fs.createFile(path);
- builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag);
- FSDataOutputStream stream = builder.create().build();
- if (data != null && data.length > 0) {
- stream.write(data);
- }
- stream.close();
- IOUtils.closeStream(stream);
+ FSDataOutputStreamBuilder builder = fs.createFile(path);
+ builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true");
+ builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag);
+ FSDataOutputStream stream = builder.create().build();
Review Comment:
use try-with-resources here.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java:
##########
@@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId,
PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
getOriginalDestKey(),
0,
- new PutObjectOptions(null, headers));
+ new PutObjectOptions(true,false, null, headers));
Review Comment:
style nit: put a space after the comma. checkstyle will complain about this
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java:
##########
@@ -26,6 +26,16 @@
*/
public final class PutObjectOptions {
+ /**
+ * Can the PUT operation skip marker deletion?
+ */
+ private final boolean keepMarkers;
+
+ /**
+ * Is this a conditional PUT operation
+ */
+ private final boolean conditionalPutEnabled;
Review Comment:
given there's now a etag match too, maybe we should give this a name, such as
`noObjectOverwrite`
we can then add a new one after `etagOverwrite` which will be a string
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java:
##########
@@ -533,12 +534,19 @@ public CreateMultipartUploadRequest.Builder
newMultipartUploadRequestBuilder(
public CompleteMultipartUploadRequest.Builder
newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
- List<CompletedPart> partETags) {
+ List<CompletedPart> partETags,
+ PutObjectOptions putOptions) {
+
// a copy of the list is required, so that the AWS SDK doesn't
// attempt to sort an unmodifiable list.
- CompleteMultipartUploadRequest.Builder requestBuilder =
-
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
+ CompleteMultipartUploadRequest.Builder requestBuilder;
+ Map<String, String> optionHeaders = putOptions.getHeaders();
+ requestBuilder =
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
+ if (putOptions.isconditionalPutEnabled()) {
+ requestBuilder.overrideConfiguration(
Review Comment:
this is from the first patch, isn't it? I'll have to look at my comments
there
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java:
##########
@@ -26,6 +26,16 @@
*/
public final class PutObjectOptions {
+ /**
+ * Can the PUT operation skip marker deletion?
+ */
+ private final boolean keepMarkers;
+
+ /**
+ * Is this a conditional PUT operation
Review Comment:
add a ? at the end
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+ private Configuration conf;
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ removeBaseAndBucketOverrides(
+ conf,
+ MULTIPART_SIZE,
+ UPLOAD_PART_COUNT_LIMIT,
+ MIN_MULTIPART_THRESHOLD);
+ conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+ conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+ conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ return conf;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE,
+ "Skipping IfNoneMatch tests");
+ }
+
+ private static void assertS3ExceptionStatusCode(int code, Exception ex) {
+ S3Exception s3Exception = (S3Exception) ex.getCause();
+
+ if (s3Exception.statusCode() != code) {
+ throw new AssertionError("Expected status code " + code + " from "
+ ex, ex);
+ }
+ }
+
+ protected String getBlockOutputBufferName() {
+ return FAST_UPLOAD_BUFFER_ARRAY;
+ }
+
+ /**
+ * Create a file using the PutIfMatch feature from S3
+ * @param fs filesystem
+ * @param path path to write
+ * @param data source dataset. Can be null
+ * @throws IOException on any problem
+ */
+ private static void createFileWithIfNoneMatchFlag(
+ FileSystem fs,
+ Path path,
+ byte[] data,
+ String ifMatchTag) throws Exception {
+ FSDataOutputStreamBuilder builder = fs.createFile(path);
+ builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true");
+ builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag);
+ FSDataOutputStream stream = builder.create().build();
+ if (data != null && data.length > 0) {
+ stream.write(data);
+ }
+ stream.close();
+ IOUtils.closeStream(stream);
+ }
+
+ @Test
+ public void testPutIfAbsentConflict() throws Throwable {
+ FileSystem fs = getFileSystem();
+ Path testFile = methodPath();
+ fs.mkdirs(testFile.getParent());
+ byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+
+ createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
+
+ RemoteFileChangedException firstException =
intercept(RemoteFileChangedException.class,
+ () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes,
"*"));
+ assertS3ExceptionStatusCode(412, firstException);
+
+ RemoteFileChangedException secondException =
intercept(RemoteFileChangedException.class,
+ () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes,
"*"));
+ assertS3ExceptionStatusCode(412, secondException);
+ }
+
+
+ @Test
+ public void testPutIfAbsentLargeFileConflict() throws Throwable {
Review Comment:
I'd offered a way to test this without needing to create a large file for a
multipart upload...we can work on this...anything uploading 6 MB gets moved
into the scale tests, so doesn't get run as often
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java:
##########
@@ -39,15 +49,36 @@ public final class PutObjectOptions {
/**
* Constructor.
* @param storageClass Storage class, if not null.
+ * @param conditionalPutEnabled Is this a conditional Put?
* @param headers Headers; may be null.
*/
public PutObjectOptions(
+ final boolean keepMarkers,
Review Comment:
the keepmarkers option is something I've cut from trunk. It's come in here
during rebase merge work.
proposed;
* you move the new argument to after the `headers` entry
* remove the keepmarkers argument/field and all uses of it in this class.
> S3A: Support S3 Conditional Writes
> ----------------------------------
>
> Key: HADOOP-19256
> URL: https://issues.apache.org/jira/browse/HADOOP-19256
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3 Conditional Write (Put-if-absent) capability is now generally available -
> [https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/]
>
> S3A should allow passing in this put-if-absent header to prevent over writing
> of files.
> There is a feature branch for this: HADOOP-19256-s3-conditional-writes
> + support etags to allow an overwrite to be restricted to overwriting a
> specific version. This can be done through a createFile option.
> https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html
> Fun fact; third party stores will not reject overwrites if they don't
> recognise the headers, so there's no way to be sure they are supported
> without testing.
> we need a flag to enable/disable conditional writes which can be exposed in a
> hasPathCapability()
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]