steveloughran commented on code in PR #7329: URL: https://github.com/apache/hadoop/pull/7329#discussion_r1932400430
########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; + +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.IOException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + + +public class ITestS3APutIfMatch extends AbstractS3ACostTest { + + private Configuration conf; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + removeBaseAndBucketOverrides( + conf, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT, + MIN_MULTIPART_THRESHOLD); + conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); + conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + conf = createConfiguration(); Review Comment: not needed as `super.setup()` creates the config. Instead use ``` Configuration conf = getConfiguration(); ``` ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java: ########## @@ -101,24 +99,26 @@ private static void createFileWithIfNoneMatchFlag( Path path, byte[] data, String ifMatchTag) throws Exception { - FSDataOutputStreamBuilder builder = fs.createFile(path); - builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); - FSDataOutputStream stream = builder.create().build(); - if (data != null && data.length > 0) { - stream.write(data); - } - stream.close(); - IOUtils.closeStream(stream); + FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true"); + builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag); + FSDataOutputStream stream = builder.create().build(); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + IOUtils.closeStream(stream); } @Test public void testPutIfAbsentConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); - fs.mkdirs(testFile.getParent()); byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); assertS3ExceptionStatusCode(412, firstException); Review Comment: we've got a constant in org.apache.hadoop.fs.s3a.impl.InternalConstants for this number ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; + +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.IOException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + + +public class ITestS3APutIfMatch extends AbstractS3ACostTest { + + private Configuration conf; Review Comment: cut ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; Review Comment: import ordering is wrong...i'll give you the IDE settings for this ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java: ########## @@ -62,6 +62,7 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; Review Comment: move below line 67 ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java: ########## @@ -101,24 +99,26 @@ private static void createFileWithIfNoneMatchFlag( Path path, byte[] data, String ifMatchTag) throws Exception { - FSDataOutputStreamBuilder builder = fs.createFile(path); - builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); - FSDataOutputStream stream = builder.create().build(); - if (data != null && data.length > 0) { - stream.write(data); - } - stream.close(); - IOUtils.closeStream(stream); + FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true"); + builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag); + FSDataOutputStream stream = builder.create().build(); Review Comment: use try-with-resources here. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java: ########## @@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId, PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( getOriginalDestKey(), 0, - new PutObjectOptions(null, headers)); + new PutObjectOptions(true,false, null, headers)); Review Comment: style nit: put a space after the comma. checkstyle will complain about this ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java: ########## @@ -26,6 +26,16 @@ */ public final class PutObjectOptions { + /** + * Can the PUT operation skip marker deletion? + */ + private final boolean keepMarkers; + + /** + * Is this a conditional PUT operation + */ + private final boolean conditionalPutEnabled; Review Comment: given there's now a etag match too, maybe we should give this a name, such as `noObjectOverwrite` we can then add a new one after `etagOverwrite` which will be a string ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java: ########## @@ -533,12 +534,19 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List<CompletedPart> partETags) { + List<CompletedPart> partETags, + PutObjectOptions putOptions) { + // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. - CompleteMultipartUploadRequest.Builder requestBuilder = - CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + CompleteMultipartUploadRequest.Builder requestBuilder; + Map<String, String> optionHeaders = putOptions.getHeaders(); + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + if (putOptions.isconditionalPutEnabled()) { + requestBuilder.overrideConfiguration( Review Comment: this is from the first patch, isn't it? I'll have to look at my comments there ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java: ########## @@ -26,6 +26,16 @@ */ public final class PutObjectOptions { + /** + * Can the PUT operation skip marker deletion? + */ + private final boolean keepMarkers; + + /** + * Is this a conditional PUT operation Review Comment: add a ? at the end ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; + +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.IOException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + + +public class ITestS3APutIfMatch extends AbstractS3ACostTest { + + private Configuration conf; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + removeBaseAndBucketOverrides( + conf, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT, + MIN_MULTIPART_THRESHOLD); + conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); + conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + conf = createConfiguration(); + skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE, + "Skipping IfNoneMatch tests"); + } + + private static void assertS3ExceptionStatusCode(int code, Exception ex) { + S3Exception s3Exception = (S3Exception) ex.getCause(); + + if (s3Exception.statusCode() != code) { + throw new AssertionError("Expected status code " + code + " from " + ex, ex); + } + } + + protected String getBlockOutputBufferName() { + return FAST_UPLOAD_BUFFER_ARRAY; + } + + /** + * Create a file using the PutIfMatch feature from S3 + * @param fs filesystem + * @param path path to write + * @param data source dataset. Can be null + * @throws IOException on any problem + */ + private static void createFileWithIfNoneMatchFlag( + FileSystem fs, + Path path, + byte[] data, + String ifMatchTag) throws Exception { + FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true"); + builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag); + FSDataOutputStream stream = builder.create().build(); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + IOUtils.closeStream(stream); + } + + @Test + public void testPutIfAbsentConflict() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); + + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, firstException); + + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, secondException); + } + + + @Test + public void testPutIfAbsentLargeFileConflict() throws Throwable { Review Comment: I'd offered a way to test this without needing to create a large file for a multipart upload...we can work on this...anything uploading 6 MB gets moved into the scale tests, so doesn't get run as often ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java: ########## @@ -39,15 +49,36 @@ public final class PutObjectOptions { /** * Constructor. * @param storageClass Storage class, if not null. + * @param conditionalPutEnabled Is this a conditional Put? * @param headers Headers; may be null. */ public PutObjectOptions( + final boolean keepMarkers, Review Comment: the keepmarkers option is something I've cut from trunk. It's come in here during rebase merge work. proposed; * you move the new argument to after the `headers` entry * remove the keepmarkers argument/field and all uses of it in this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
