jackye1995 commented on a change in pull request #4342:
URL: https://github.com/apache/iceberg/pull/4342#discussion_r829653214



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -128,36 +137,65 @@ public void deleteFile(String path) {
    */
   @Override
   public void deleteFiles(Iterable<String> paths) throws 
BulkDeletionFailureException {
-    SetMultimap<String, String> bucketToObjects = 
Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
-    int numberOfFailedDeletions = 0;
-    for (String path : paths) {
-      S3URI location = new S3URI(path);
-      String bucket = location.bucket();
-      String objectKey = location.key();
-      Set<String> objectsInBucket = bucketToObjects.get(bucket);
-      if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) {
-        List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, 
objectsInBucket);
-        numberOfFailedDeletions += failedDeletionsForBatch.size();
-        failedDeletionsForBatch.forEach(failedPath -> LOG.warn("Failed to 
delete object at path {}", failedPath));
-        bucketToObjects.removeAll(bucket);
+    if (deleteTags.isEmpty()) {
+      SetMultimap<String, String> bucketToObjects = Multimaps
+          .newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+      int numberOfFailedDeletions = 0;
+      for (String path : paths) {
+        S3URI location = new S3URI(path);
+        String bucket = location.bucket();
+        String objectKey = location.key();
+        Set<String> objectsInBucket = bucketToObjects.get(bucket);
+        if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) 
{
+          List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, 
objectsInBucket);
+          numberOfFailedDeletions += failedDeletionsForBatch.size();
+          failedDeletionsForBatch
+              .forEach(failedPath -> LOG.warn("Failed to delete object at path 
{}", failedPath));
+          bucketToObjects.removeAll(bucket);
+        }
+        bucketToObjects.get(bucket).add(objectKey);
       }
-      bucketToObjects.get(bucket).add(objectKey);
-    }
 
-    // Delete the remainder
-    for (Map.Entry<String, Collection<String>> bucketToObjectsEntry : 
bucketToObjects.asMap().entrySet()) {
-      final String bucket = bucketToObjectsEntry.getKey();
-      final Collection<String> objects = bucketToObjectsEntry.getValue();
-      List<String> failedDeletions = deleteObjectsInBucket(bucket, objects);
-      failedDeletions.forEach(failedPath -> LOG.warn("Failed to delete object 
at path {}", failedPath));
-      numberOfFailedDeletions += failedDeletions.size();
-    }
+      // Delete the remainder
+      for (Map.Entry<String, Collection<String>> bucketToObjectsEntry : 
bucketToObjects.asMap()
+          .entrySet()) {
+        final String bucket = bucketToObjectsEntry.getKey();
+        final Collection<String> objects = bucketToObjectsEntry.getValue();
+        List<String> failedDeletions = deleteObjectsInBucket(bucket, objects);
+        failedDeletions
+            .forEach(failedPath -> LOG.warn("Failed to delete object at path 
{}", failedPath));
+        numberOfFailedDeletions += failedDeletions.size();
+      }
 
-    if (numberOfFailedDeletions > 0) {
-      throw new BulkDeletionFailureException(numberOfFailedDeletions);
+      if (numberOfFailedDeletions > 0) {
+        throw new BulkDeletionFailureException(numberOfFailedDeletions);
+      }
+    } else {
+      paths.forEach(this::doSoftDelete);
     }
   }
 
+  private void doSoftDelete(String path) {
+    S3URI location = new S3URI(path);
+    String bucket = location.bucket();
+    String objectKey = location.key();
+    GetObjectTaggingRequest getObjectTaggingRequest = 
GetObjectTaggingRequest.builder()
+        .bucket(bucket)
+        .key(objectKey)
+        .build();
+    GetObjectTaggingResponse getObjectTaggingResponse = client()
+        .getObjectTagging(getObjectTaggingRequest);
+    // Get existing tags, if any and then add the delete tags
+    Set<Tag> tags = Sets.newHashSet(getObjectTaggingResponse.tagSet());
+    tags.addAll(deleteTags);
+    PutObjectTaggingRequest putObjectTaggingRequest = 
PutObjectTaggingRequest.builder()

Review comment:
       Yes there is a potential risk of race condition. However I think it is 
fine in most of the "proper" use of Iceberg table. The file should be immutable 
once written until deletion, I think the chance of people messing up with 
tagging is relatively low.
   
   In the worst case, a file will miss a tag but is guaranteed to contain a 
delete tag. It should still go through the lifecycle management based on the 
specific delete tag.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -108,6 +115,15 @@ public OutputFile newOutputFile(String path) {
 
   @Override
   public void deleteFile(String path) {
+    if (awsProperties.s3DeleteTags() != null && 
!awsProperties.s3DeleteTags().isEmpty()) {
+      try {
+        doSoftDelete(path);
+      } catch (S3Exception e) {
+        LOG.warn("Failed to add delete tags: {}", path, e);
+      }
+      return;

Review comment:
       nit: newline after try block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -248,6 +248,15 @@
    */
   public static final String S3_WRITE_TAGS_PREFIX = "s3.write.tags.";
 
+  /**
+   * Used by {@link S3FileIO} to tag objects when deleting. To set, we can 
pass a catalog property.
+   * <p>
+   * For more details, see 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html

Review comment:
       it's a bit sudden to link this feature to lifecycle management. We 
should describe that, "when this config is set, objects are tagged with the 
configured key-value pairs instead of being hard-deleted. This is considered a 
soft-delete, because users are able to configure tag-based object lifecycle 
policy at bucket level to transition objects to different tiers"

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -155,6 +180,30 @@ public void deleteFiles(Iterable<String> paths) throws 
BulkDeletionFailureExcept
     }
   }
 
+  private void doSoftDelete(String path) throws S3Exception {
+    S3URI location = new S3URI(path);
+    String bucket = location.bucket();
+    String objectKey = location.key();
+    GetObjectTaggingRequest getObjectTaggingRequest = 
GetObjectTaggingRequest.builder()
+        .bucket(bucket)
+        .key(objectKey)
+        .build();
+    GetObjectTaggingResponse getObjectTaggingResponse = client()
+        .getObjectTagging(getObjectTaggingRequest);
+    // Get existing tags, if any and then add the delete tags
+    Set<Tag> tags = Sets.newHashSet();
+    if (getObjectTaggingResponse != null && 
getObjectTaggingResponse.hasTagSet()) {

Review comment:
       I don't think `getObjectTaggingResponse` would ever be null, we can at 
least trust AWS client with that :)

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -155,6 +180,30 @@ public void deleteFiles(Iterable<String> paths) throws 
BulkDeletionFailureExcept
     }
   }
 
+  private void doSoftDelete(String path) throws S3Exception {
+    S3URI location = new S3URI(path);
+    String bucket = location.bucket();
+    String objectKey = location.key();
+    GetObjectTaggingRequest getObjectTaggingRequest = 
GetObjectTaggingRequest.builder()
+        .bucket(bucket)
+        .key(objectKey)
+        .build();
+    GetObjectTaggingResponse getObjectTaggingResponse = client()
+        .getObjectTagging(getObjectTaggingRequest);
+    // Get existing tags, if any and then add the delete tags
+    Set<Tag> tags = Sets.newHashSet();
+    if (getObjectTaggingResponse != null && 
getObjectTaggingResponse.hasTagSet()) {
+      tags.addAll(getObjectTaggingResponse.tagSet());
+    }
+    tags.addAll(awsProperties.s3DeleteTags());

Review comment:
       nit: newline after if block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -125,6 +141,15 @@ public void deleteFile(String path) {
    */
   @Override
   public void deleteFiles(Iterable<String> paths) throws 
BulkDeletionFailureException {
+    if (awsProperties.s3DeleteTags() != null && 
!awsProperties.s3DeleteTags().isEmpty()) {
+      Tasks.foreach(paths)
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((path, exc) -> LOG.warn("Failed to add delete tags: {}", 
path, exc))
+          .run(this::doSoftDelete);

Review comment:
       good point, we can follow what we did in 
https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java#L110-L122
 for this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to