This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 70c506ebad AWS: Implement SupportsRecoveryOperations mixin for 
S3FileIO (#10721)
70c506ebad is described below

commit 70c506ebad2dfc6d61b99c05efd59e884282bfa6
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Aug 8 14:35:17 2024 -0700

    AWS: Implement SupportsRecoveryOperations mixin for S3FileIO (#10721)
---
 .../org/apache/iceberg/aws/AwsIntegTestUtil.java   | 62 ++++++++++++++--------
 .../iceberg/aws/s3/TestS3FileIOIntegration.java    | 38 +++++++++++++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   | 50 ++++++++++++++++-
 3 files changed, 128 insertions(+), 22 deletions(-)

diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java 
b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
index bbe062d5db..7e0ca6ed10 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
@@ -30,9 +31,10 @@ import 
software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.Delete;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.ObjectVersion;
+import 
software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;
 import software.amazon.awssdk.services.s3control.S3ControlClient;
 import 
software.amazon.awssdk.services.s3control.model.CreateAccessPointRequest;
 import 
software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest;
@@ -94,28 +96,46 @@ public class AwsIntegTestUtil {
   }
 
   public static void cleanS3Bucket(S3Client s3, String bucketName, String 
prefix) {
-    boolean hasContent = true;
-    while (hasContent) {
-      ListObjectsV2Response response =
-          s3.listObjectsV2(
-              
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build());
-      hasContent = response.hasContents();
-      if (hasContent) {
-        s3.deleteObjects(
-            DeleteObjectsRequest.builder()
-                .bucket(bucketName)
-                .delete(
-                    Delete.builder()
-                        .objects(
-                            response.contents().stream()
-                                .map(obj -> 
ObjectIdentifier.builder().key(obj.key()).build())
-                                .collect(Collectors.toList()))
-                        .build())
-                .build());
-      }
+    ListObjectVersionsIterable response =
+        s3.listObjectVersionsPaginator(
+            
ListObjectVersionsRequest.builder().bucket(bucketName).prefix(prefix).build());
+    List<ObjectVersion> versionsToDelete = Lists.newArrayList();
+    int batchDeletionSize = 1000;
+    response.versions().stream()
+        .forEach(
+            version -> {
+              versionsToDelete.add(version);
+              if (versionsToDelete.size() == batchDeletionSize) {
+                deleteObjectVersions(s3, bucketName, versionsToDelete);
+                versionsToDelete.clear();
+              }
+            });
+
+    if (!versionsToDelete.isEmpty()) {
+      deleteObjectVersions(s3, bucketName, versionsToDelete);
     }
   }
 
+  private static void deleteObjectVersions(
+      S3Client s3, String bucket, List<ObjectVersion> objectVersions) {
+    s3.deleteObjects(
+        DeleteObjectsRequest.builder()
+            .bucket(bucket)
+            .delete(
+                Delete.builder()
+                    .objects(
+                        objectVersions.stream()
+                            .map(
+                                obj ->
+                                    ObjectIdentifier.builder()
+                                        .key(obj.key())
+                                        .versionId(obj.versionId())
+                                        .build())
+                            .collect(Collectors.toList()))
+                    .build())
+            .build());
+  }
+
   public static void cleanGlueCatalog(GlueClient glue, List<String> 
namespaces) {
     for (String namespace : namespaces) {
       try {
diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
index 18abb82ce7..cacf048918 100644
--- 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
+++ 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
@@ -53,14 +53,17 @@ import 
software.amazon.awssdk.services.kms.model.ListAliasesRequest;
 import software.amazon.awssdk.services.kms.model.ListAliasesResponse;
 import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.BucketVersioningStatus;
 import software.amazon.awssdk.services.s3.model.GetObjectAclRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectAclResponse;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
 import software.amazon.awssdk.services.s3.model.Permission;
+import software.amazon.awssdk.services.s3.model.PutBucketVersioningRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.s3.model.VersioningConfiguration;
 import software.amazon.awssdk.services.s3control.S3ControlClient;
 import software.amazon.awssdk.utils.ImmutableMap;
 import software.amazon.awssdk.utils.IoUtils;
@@ -106,6 +109,12 @@ public class TestS3FileIOIntegration {
     AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, bucketName);
     AwsIntegTestUtil.createAccessPoint(
         crossRegionS3Control, crossRegionAccessPointName, 
crossRegionBucketName);
+    s3.putBucketVersioning(
+        PutBucketVersioningRequest.builder()
+            .bucket(bucketName)
+            .versioningConfiguration(
+                
VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build())
+            .build());
   }
 
   @AfterAll
@@ -445,6 +454,35 @@ public class TestS3FileIOIntegration {
             });
   }
 
+  @Test
+  public void testFileRecoveryHappyPath() throws Exception {
+    S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new 
S3FileIOProperties());
+    String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, 
"someFile.parquet");
+    write(s3FileIO, filePath);
+    s3FileIO.deleteFile(filePath);
+    assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse();
+
+    assertThat(s3FileIO.recoverFile(filePath)).isTrue();
+    assertThat(s3FileIO.newInputFile(filePath).exists()).isTrue();
+  }
+
+  @Test
+  public void testFileRecoveryFailsToRecover() throws Exception {
+    S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new 
S3FileIOProperties());
+    s3.putBucketVersioning(
+        PutBucketVersioningRequest.builder()
+            .bucket(bucketName)
+            .versioningConfiguration(
+                
VersioningConfiguration.builder().status(BucketVersioningStatus.SUSPENDED).build())
+            .build());
+    String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, 
"unversionedFile.parquet");
+    write(s3FileIO, filePath);
+    s3FileIO.deleteFile(filePath);
+    assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse();
+
+    assertThat(s3FileIO.recoverFile(filePath)).isFalse();
+  }
+
   private S3FileIOProperties getDeletionTestProperties() {
     S3FileIOProperties properties = new S3FileIOProperties();
     properties.setDeleteBatchSize(deletionBatchSize);
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index dd13e13f01..f7d2da5eb9 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -20,8 +20,10 @@ package org.apache.iceberg.aws.s3;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -37,6 +39,7 @@ import org.apache.iceberg.io.DelegateFileIO;
 import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsRecoveryOperations;
 import org.apache.iceberg.metrics.MetricsContext;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +55,7 @@ import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.Delete;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@@ -61,10 +65,12 @@ import 
software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.ObjectVersion;
 import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.Tag;
 import software.amazon.awssdk.services.s3.model.Tagging;
+import 
software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;
 
 /**
  * FileIO implementation backed by S3.
@@ -73,7 +79,7 @@ import software.amazon.awssdk.services.s3.model.Tagging;
  * schemes s3a, s3n, https are also treated as s3 file paths. Using this 
FileIO with other schemes
  * will result in {@link org.apache.iceberg.exceptions.ValidationException}.
  */
-public class S3FileIO implements CredentialSupplier, DelegateFileIO {
+public class S3FileIO implements CredentialSupplier, DelegateFileIO, 
SupportsRecoveryOperations {
   private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
   private static final String DEFAULT_METRICS_IMPL =
       "org.apache.iceberg.hadoop.HadoopMetricsContext";
@@ -420,4 +426,46 @@ public class S3FileIO implements CredentialSupplier, 
DelegateFileIO {
       }
     }
   }
+
+  @Override
+  public boolean recoverFile(String path) {
+    S3URI location = new S3URI(path, 
s3FileIOProperties.bucketToAccessPointMapping());
+    ListObjectVersionsIterable response =
+        client()
+            .listObjectVersionsPaginator(
+                builder -> 
builder.bucket(location.bucket()).prefix(location.key()));
+
+    // Recover to the last modified version, not isLatest,
+    // since isLatest is true for deletion markers.
+    Optional<ObjectVersion> recoverVersion =
+        
response.versions().stream().max(Comparator.comparing(ObjectVersion::lastModified));
+
+    return recoverVersion.map(version -> recoverObject(version, 
location.bucket())).orElse(false);
+  }
+
+  private boolean recoverObject(ObjectVersion version, String bucket) {
+    if (version.isLatest()) {
+      return true;
+    }
+
+    LOG.info("Attempting to recover object {}", version.key());
+    try {
+      // Perform a copy instead of deleting the delete marker
+      // so that recovery does not rely on delete permissions
+      client()
+          .copyObject(
+              builder ->
+                  builder
+                      .sourceBucket(bucket)
+                      .sourceKey(version.key())
+                      .sourceVersionId(version.versionId())
+                      .destinationBucket(bucket)
+                      .destinationKey(version.key()));
+    } catch (SdkException e) {
+      LOG.warn("Failed to recover object {}", version.key(), e);
+      return false;
+    }
+
+    return true;
+  }
 }

Reply via email to