SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788266424



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +208,13 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", 
stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? 
MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);

Review comment:
       Yea, since there are public methods that call `newStream` and only throw 
IOException, I wanted to avoid changing signature.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +421,22 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileWithEtag {
+    private final File file;
+    private final MessageDigest eTag;

Review comment:
       I think we can do that, it will add the logic a tad bit more complicated 
as we close stream at multiple places. Will update it.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +254,49 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = 
UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
               .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .contentLength(f.file().length());

Review comment:
       Good point, updating.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +356,32 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        
requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), 
RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, 
RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        checkProtocolException(uncheckedIOException, 
putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private static void checkProtocolException(UncheckedIOException 
uncheckedIOException, String requestString) {
+    if (uncheckedIOException.getCause() instanceof ProtocolException) {
+      LOG.error("S3 Request Failure: {}", requestString);
+      throw uncheckedIOException;

Review comment:
       I initially wanted to log only if it is a `ProtocolException`, however 
now I am thinking it is better to log irrespective. Updating to that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to