steveloughran commented on code in PR #5481:
URL: https://github.com/apache/hadoop/pull/5481#discussion_r1162598087


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -414,6 +414,11 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private ArnResource accessPoint;
 
+  /**
+   * Is this S3A FS instance has multipart uploads enabled?

Review Comment:
   grammar nit
   "is multipart upload enabled?"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java:
##########
@@ -1031,6 +1031,40 @@ public static long 
getMultipartSizeProperty(Configuration conf,
     return partSize;
   }
 
+  /**
+   * Validates the output stream configuration
+   * @param conf : configuration object for the given context
+   * @throws IOException : throws an IOException on config mismatch
+   */
+  public static void validateOutputStreamConfiguration(Configuration conf) 
throws IOException {
+    if(!checkDiskBuffer(conf)){
+      throw new IOException("Unable to create OutputStream with the given"
+          + " multipart upload and buffer configuration.");
+    }
+  }
+
+  /**
+   * Check whether the configuration for S3ABlockOutputStream is
+   * consistent or not. Multipart uploads allow all kinds of fast buffers to
+   * be supported. When the option is disabled only disk buffers are allowed to
+   * be used as the file size might be bigger than the buffer size that can be
+   * allocated.
+   * @param conf : configuration object for the given context
+   * @return true if the disk buffer and the multipart settings are supported
+   */
+  public static boolean checkDiskBuffer(Configuration conf) {
+    boolean isMultipartUploadEnabled = 
conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
+        MULTIPART_UPLOAD_ENABLED_DEFAULT);
+    if (isMultipartUploadEnabled) {
+      return true;
+    } else if (!isMultipartUploadEnabled && conf.get(FAST_UPLOAD_BUFFER)

Review Comment:
   can be simplified to
   ```
   return isMultipartUploadEnabled
    || FAST_UPLOAD_BUFFER_DISK.equals(conf.get(FAST_UPLOAD_BUFFER, 
DEFAULT_FAST_UPLOAD_BUFFER));
   ```
   that default in conf.get is critical to prevent NPEs if the option is unset, 
moving the constant first even more rigorous



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java:
##########
@@ -199,7 +200,7 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest(
    */

Review Comment:
   document the @throws IOException, stating when it is raised.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1854,7 +1863,8 @@ private FSDataOutputStream innerCreateFile(
         .withCSEEnabled(isCSEEnabled)
         .withPutOptions(putOptions)
         .withIOStatisticsAggregator(
-            
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
+            
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
+            .withMultipartEnabled(isMultipartUploadEnabled);

Review Comment:
   nit, indentation. should be aligned with the `.with` above



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
+import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);

Review Comment:
   again clear any overrides anyone has set.
   ```
   removeBucketOverrides(getTestBucketName(conf), conf,
           S3A_COMMITTER_FACTORY_KEY,
           FS_S3A_COMMITTER_NAME,
           MULTIPART_UPLOADS_ENABLED);
     }
   ```
   



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java:
##########
@@ -460,7 +466,10 @@ public AbortMultipartUploadRequest 
newAbortMultipartUploadRequest(
   @Override
   public InitiateMultipartUploadRequest newMultipartUploadRequest(
       final String destKey,
-      @Nullable final PutObjectOptions options) {
+      @Nullable final PutObjectOptions options) throws IOException {
+    if (!isMultipartUploadEnabled) {
+      throw new IOException("Multipart uploads are disabled on the given 
filesystem.");

Review Comment:
   make a PathIOException and include destkey. This gives a bit more detail.
   ```
   throw new PathIOException(destKey, "Multipart uploads are disabled");
   ```
   



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);

Review Comment:
   tests need to make sure that they've removed any per-bucket settings as they 
can trigger false failures. There's a method in S3ATestUtils to do this - 
`AbstractCommitITest` shows its use
   ```
       removeBucketOverrides(getTestBucketName(conf), conf,
           MAGIC_COMMITTER_ENABLED,
           S3A_COMMITTER_FACTORY_KEY,
           FS_S3A_COMMITTER_NAME,
           MULTIPART_UPLOADS_ENABLED);
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to