jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789152670



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = 
Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;

Review comment:
       nit: old name was not updated

##########
File path: aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
##########
@@ -77,14 +86,15 @@
   private final String newTmpDirectory = "/tmp/newStagingDirectory";
 
   private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
-      AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
+      AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(FIVE_MBS),

Review comment:
       this change does not seem necessary

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +344,39 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        
requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), 
RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, 
RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        logS3RequestAndThrow(uncheckedIOException, 
putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private void closeStream() throws IOException {
+    if (isEtagCheckEnabled) {
+      if (stagingFiles.size() > 0) {
+        stagingFiles.get(stagingFiles.size() - 
1).setDigest(currentPartMessageDigest.digest());
+      }
+    }
+    stream.close();
+  }
+
+  private static void logS3RequestAndThrow(UncheckedIOException 
uncheckedIOException, String requestString) {
+    LOG.error("S3 Request Failure: {}", requestString);
+    throw uncheckedIOException;

Review comment:
       +1, logging full request also has security implications for many 
applications and should be avoided.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -373,4 +384,12 @@ public String dynamoDbTableName() {
   public void setDynamoDbTableName(String name) {
     this.dynamoDbTableName = name;
   }
+
+  public boolean isS3ChecksumEnabled() {
+    return this.isS3ChecksumEnabled;
+  }
+
+  public void setS3ChecksumEnabled(boolean eTagCheckEnabled) {

Review comment:
       nit: `eTagCheckEnabled` was not updated from the old name, maybe just 
`enabled`?

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();

Review comment:
       I think we can name this variable `f` to avoid name changing in 
subsequent lines




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to