This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 70c506ebad AWS: Implement SupportsRecoveryOperations mixin for
S3FileIO (#10721)
70c506ebad is described below
commit 70c506ebad2dfc6d61b99c05efd59e884282bfa6
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Aug 8 14:35:17 2024 -0700
AWS: Implement SupportsRecoveryOperations mixin for S3FileIO (#10721)
---
.../org/apache/iceberg/aws/AwsIntegTestUtil.java | 62 ++++++++++++++--------
.../iceberg/aws/s3/TestS3FileIOIntegration.java | 38 +++++++++++++
.../java/org/apache/iceberg/aws/s3/S3FileIO.java | 50 ++++++++++++++++-
3 files changed, 128 insertions(+), 22 deletions(-)
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
index bbe062d5db..7e0ca6ed10 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
@@ -30,9 +31,10 @@ import
software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.ObjectVersion;
+import
software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;
import software.amazon.awssdk.services.s3control.S3ControlClient;
import
software.amazon.awssdk.services.s3control.model.CreateAccessPointRequest;
import
software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest;
@@ -94,28 +96,46 @@ public class AwsIntegTestUtil {
}
public static void cleanS3Bucket(S3Client s3, String bucketName, String
prefix) {
- boolean hasContent = true;
- while (hasContent) {
- ListObjectsV2Response response =
- s3.listObjectsV2(
-
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build());
- hasContent = response.hasContents();
- if (hasContent) {
- s3.deleteObjects(
- DeleteObjectsRequest.builder()
- .bucket(bucketName)
- .delete(
- Delete.builder()
- .objects(
- response.contents().stream()
- .map(obj ->
ObjectIdentifier.builder().key(obj.key()).build())
- .collect(Collectors.toList()))
- .build())
- .build());
- }
+ ListObjectVersionsIterable response =
+ s3.listObjectVersionsPaginator(
+
ListObjectVersionsRequest.builder().bucket(bucketName).prefix(prefix).build());
+ List<ObjectVersion> versionsToDelete = Lists.newArrayList();
+ int batchDeletionSize = 1000;
+ response.versions().stream()
+ .forEach(
+ version -> {
+ versionsToDelete.add(version);
+ if (versionsToDelete.size() == batchDeletionSize) {
+ deleteObjectVersions(s3, bucketName, versionsToDelete);
+ versionsToDelete.clear();
+ }
+ });
+
+ if (!versionsToDelete.isEmpty()) {
+ deleteObjectVersions(s3, bucketName, versionsToDelete);
}
}
+ private static void deleteObjectVersions(
+ S3Client s3, String bucket, List<ObjectVersion> objectVersions) {
+ s3.deleteObjects(
+ DeleteObjectsRequest.builder()
+ .bucket(bucket)
+ .delete(
+ Delete.builder()
+ .objects(
+ objectVersions.stream()
+ .map(
+ obj ->
+ ObjectIdentifier.builder()
+ .key(obj.key())
+ .versionId(obj.versionId())
+ .build())
+ .collect(Collectors.toList()))
+ .build())
+ .build());
+ }
+
public static void cleanGlueCatalog(GlueClient glue, List<String>
namespaces) {
for (String namespace : namespaces) {
try {
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
index 18abb82ce7..cacf048918 100644
---
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
+++
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
@@ -53,14 +53,17 @@ import
software.amazon.awssdk.services.kms.model.ListAliasesRequest;
import software.amazon.awssdk.services.kms.model.ListAliasesResponse;
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.BucketVersioningStatus;
import software.amazon.awssdk.services.s3.model.GetObjectAclRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAclResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.Permission;
+import software.amazon.awssdk.services.s3.model.PutBucketVersioningRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.s3.model.VersioningConfiguration;
import software.amazon.awssdk.services.s3control.S3ControlClient;
import software.amazon.awssdk.utils.ImmutableMap;
import software.amazon.awssdk.utils.IoUtils;
@@ -106,6 +109,12 @@ public class TestS3FileIOIntegration {
AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, bucketName);
AwsIntegTestUtil.createAccessPoint(
crossRegionS3Control, crossRegionAccessPointName,
crossRegionBucketName);
+ s3.putBucketVersioning(
+ PutBucketVersioningRequest.builder()
+ .bucket(bucketName)
+ .versioningConfiguration(
+
VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build())
+ .build());
}
@AfterAll
@@ -445,6 +454,35 @@ public class TestS3FileIOIntegration {
});
}
+ @Test
+ public void testFileRecoveryHappyPath() throws Exception {
+ S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new
S3FileIOProperties());
+ String filePath = String.format("s3://%s/%s/%s", bucketName, prefix,
"someFile.parquet");
+ write(s3FileIO, filePath);
+ s3FileIO.deleteFile(filePath);
+ assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse();
+
+ assertThat(s3FileIO.recoverFile(filePath)).isTrue();
+ assertThat(s3FileIO.newInputFile(filePath).exists()).isTrue();
+ }
+
+ @Test
+ public void testFileRecoveryFailsToRecover() throws Exception {
+ S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new
S3FileIOProperties());
+ s3.putBucketVersioning(
+ PutBucketVersioningRequest.builder()
+ .bucket(bucketName)
+ .versioningConfiguration(
+
VersioningConfiguration.builder().status(BucketVersioningStatus.SUSPENDED).build())
+ .build());
+ String filePath = String.format("s3://%s/%s/%s", bucketName, prefix,
"unversionedFile.parquet");
+ write(s3FileIO, filePath);
+ s3FileIO.deleteFile(filePath);
+ assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse();
+
+ assertThat(s3FileIO.recoverFile(filePath)).isFalse();
+ }
+
private S3FileIOProperties getDeletionTestProperties() {
S3FileIOProperties properties = new S3FileIOProperties();
properties.setDeleteBatchSize(deletionBatchSize);
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index dd13e13f01..f7d2da5eb9 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -20,8 +20,10 @@ package org.apache.iceberg.aws.s3;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -37,6 +39,7 @@ import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsRecoveryOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +55,7 @@ import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@@ -61,10 +65,12 @@ import
software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.ObjectVersion;
import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;
+import
software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;
/**
* FileIO implementation backed by S3.
@@ -73,7 +79,7 @@ import software.amazon.awssdk.services.s3.model.Tagging;
* schemes s3a, s3n, https are also treated as s3 file paths. Using this
FileIO with other schemes
* will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
-public class S3FileIO implements CredentialSupplier, DelegateFileIO {
+public class S3FileIO implements CredentialSupplier, DelegateFileIO,
SupportsRecoveryOperations {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
@@ -420,4 +426,46 @@ public class S3FileIO implements CredentialSupplier,
DelegateFileIO {
}
}
}
+
+ @Override
+ public boolean recoverFile(String path) {
+ S3URI location = new S3URI(path,
s3FileIOProperties.bucketToAccessPointMapping());
+ ListObjectVersionsIterable response =
+ client()
+ .listObjectVersionsPaginator(
+ builder ->
builder.bucket(location.bucket()).prefix(location.key()));
+
+ // Recover to the last modified version, not isLatest,
+ // since isLatest is true for deletion markers.
+ Optional<ObjectVersion> recoverVersion =
+
response.versions().stream().max(Comparator.comparing(ObjectVersion::lastModified));
+
+ return recoverVersion.map(version -> recoverObject(version,
location.bucket())).orElse(false);
+ }
+
+ private boolean recoverObject(ObjectVersion version, String bucket) {
+ if (version.isLatest()) {
+ return true;
+ }
+
+ LOG.info("Attempting to recover object {}", version.key());
+ try {
+ // Perform a copy instead of deleting the delete marker
+ // so that recovery does not rely on delete permissions
+ client()
+ .copyObject(
+ builder ->
+ builder
+ .sourceBucket(bucket)
+ .sourceKey(version.key())
+ .sourceVersionId(version.versionId())
+ .destinationBucket(bucket)
+ .destinationKey(version.key()));
+ } catch (SdkException e) {
+ LOG.warn("Failed to recover object {}", version.key(), e);
+ return false;
+ }
+
+ return true;
+ }
}