jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r778592843



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -132,6 +145,11 @@ public void write(int b) throws IOException {
     }
 
     stream.write(b);
+    if (isEtagCheckEnabled) {
+      byte byteValue = ((Integer) b).byteValue();
+      currentPartMessageDigest.update(byteValue);
+      completeMessageDigest.update(byteValue);
+    }

Review comment:
       nit: newline after if block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +77,17 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileWithEtag> stagingFilesWithETags = 
Lists.newArrayList();

Review comment:
       I think we can avoid changing this variable name because the class name 
is already self-explanatory

##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = 
"client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = 
"client.enable.etag-check";

Review comment:
       since this is only for s3, I think it makes more sense to use `s3` 
prefix. Also the convention of naming for this type of config is `xxx.enabled` 
(see example like `SPARK_WRITE_PARTITIONED_FANOUT_ENABLED`)

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +204,12 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", 
stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? 
MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+    stagingFilesWithETags.add(new FileWithEtag(currentStagingFile, 
currentPartMessageDigest));

Review comment:
       nit: newline after try block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +77,17 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileWithEtag> stagingFilesWithETags = 
Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = 
Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private static final String digestAlgorithm = "MD5";

Review comment:
       nit: static variable should be above all other non-static ones

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = 
UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = 
CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, 
RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, 
RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       S3 has built in checksum verification using request header 
`Content-MD5`. I think we don't need to check the response at client side.
   
   See 
https://aws.amazon.com/premiumsupport/knowledge-center/data-integrity-s3/ for 
more details

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -153,6 +171,10 @@ public void write(byte[] b, int off, int len) throws 
IOException {
       int writeSize = multiPartSize - (int) stream.getCount();
 
       stream.write(b, relativeOffset, writeSize);
+      if (isEtagCheckEnabled) {
+        currentPartMessageDigest.update(b, relativeOffset, writeSize);
+        completeMessageDigest.update(b, relativeOffset, writeSize);
+      }

Review comment:
       nit: newline after if block
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to