rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789133077



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();
           UploadPartRequest.Builder requestBuilder = 
UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFiles.indexOf(fileAndDigest) + 1)
+              .contentLength(file.length());
+
+          if (isEtagCheckEnabled) {
+            
requestBuilder.contentMD5(BinaryUtils.toBase64(fileAndDigest.getDigest()));
+          }
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = 
CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, 
RequestBody.fromFile(f));
+                UploadPartResponse response = null;
+                try {
+                  response = s3.uploadPart(uploadRequest, 
RequestBody.fromFile(file));
+                } catch (UncheckedIOException uncheckedIOException) {
+                  logS3RequestAndThrow(uncheckedIOException, 
uploadRequest.toString());
+                }
                 return 
CompletedPart.builder().eTag(response.eTag()).partNumber(uploadRequest.partNumber()).build();
               },
               executorService
           ).whenComplete((result, thrown) -> {
             try {
-              Files.deleteIfExists(f.toPath());
+              Files.deleteIfExists(file.toPath());
             } catch (IOException e) {
-              LOG.warn("Failed to delete staging file: {}", f, e);
+              LOG.warn("Failed to delete staging file: {}", file, e);
             }
 
             if (thrown != null) {
               LOG.error("Failed to upload part: {}", uploadRequest, thrown);
               abortUpload();
             }
           });
-
-          multiPartMap.put(f, future);
+          multiPartMap.put(file, future);

Review comment:
       Can you revert the unnecessary whitespace change here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to