This is an automated email from the ASF dual-hosted git repository. amoghj pushed a commit to branch wip-s3sign-body in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 59cf16f1a9d3041306f3bc556bd33ccd122a478c Author: amogh-jahagirdar <[email protected]> AuthorDate: Mon Aug 21 16:35:44 2023 -0700 AWS: Update S3V4RestSignerClient to send body for DeleteObjects requests --- .../aws/s3/signer/S3V4RestSignerClient.java | 27 ++++++++++++++++ .../iceberg/aws/s3/signer/S3SignerServlet.java | 36 ++++++++++++++++++++++ .../iceberg/aws/s3/signer/TestS3RestSigner.java | 31 ++++++++++++++++++- 3 files changed, 93 insertions(+), 1 deletion(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java index a334221a25..f257f5e663 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java @@ -21,6 +21,9 @@ package org.apache.iceberg.aws.s3.signer; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; import java.util.Collections; @@ -55,6 +58,8 @@ import software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams; import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.utils.IoUtils; @Value.Immutable public abstract class S3V4RestSignerClient @@ -286,6 +291,7 @@ public abstract class S3V4RestSignerClient .uri(request.getUri()) .headers(request.headers()) .properties(requestPropertiesSupplier().get()) + .body(body(request)) .build(); Key cacheKey = Key.from(remoteSigningRequest); @@ -328,6 +334,27 @@ public abstract class S3V4RestSignerClient return mutableRequest.build(); } + private String body(SdkHttpFullRequest request) { + if (shouldAddBody(request) && request.contentStreamProvider().isPresent()) { + try (InputStream is = request.contentStreamProvider().get().newStream()) { + return IoUtils.toUtf8String(is); + } catch (IOException e) { + throw new UncheckedIOException("Failed to set body for S3 sign request", e); + } + } + + return null; + } + + /** + * Only add body for DeleteObjectsRequest. Refer to + * https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestSyntax + */ + private boolean shouldAddBody(SdkHttpFullRequest request) { + return request.method() == SdkHttpMethod.POST + && request.rawQueryParameters().containsKey("delete"); + } + private void reconstructHeaders( Map<String, List<String>> signedAndUnsignedHeaders, SdkHttpFullRequest.Builder mutableRequest) { diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java index 6240efa2ad..7fc8097e25 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java @@ -32,11 +32,13 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.compress.utils.Lists; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpHeaders; import org.apache.iceberg.exceptions.RESTException; @@ -48,6 +50,7 @@ import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.ResourcePaths; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.signer.AwsS3V4Signer; @@ -78,10 +81,42 @@ public class S3SignerServlet extends HttpServlet { ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); private final ObjectMapper mapper; + private List<SignRequestValidator> s3SignRequestValidators = Lists.newArrayList(); + + /** + * SignRequestValidator is a wrapper class used for validating the actual contents of the + * S3SignRequest and thus verifying the contents the signer client puts in the request + */ + public static class SignRequestValidator { + private final Predicate<S3SignRequest> verifyExpectation; + private final Predicate<S3SignRequest> signRequestExpectation; + private final String assertMessage; + + public SignRequestValidator( + Predicate<S3SignRequest> requestValidatorPredicate, + Predicate<S3SignRequest> verifyExpectation, + String assertMessage) { + this.signRequestExpectation = requestValidatorPredicate; + this.verifyExpectation = verifyExpectation; + this.assertMessage = assertMessage; + } + + void validateRequest(S3SignRequest request) { + if (verifyExpectation.test(request)) { + Assert.assertTrue(assertMessage, signRequestExpectation.test(request)); + } + } + } + public S3SignerServlet(ObjectMapper mapper) { this.mapper = mapper; } + public S3SignerServlet(ObjectMapper mapper, List<SignRequestValidator> s3SignRequestValidators) { + this.mapper = mapper; + this.s3SignRequestValidators = s3SignRequestValidators; + } + @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) { execute(request, response); @@ -188,6 +223,7 @@ public class S3SignerServlet extends HttpServlet { S3SignRequest s3SignRequest = castRequest( S3SignRequest.class, mapper.readValue(request.getReader(), S3SignRequest.class)); + s3SignRequestValidators.forEach(validator -> validator.validateRequest(s3SignRequest)); S3SignResponse s3SignResponse = signRequest(s3SignRequest); if (CACHEABLE_METHODS.contains(SdkHttpMethod.fromValue(s3SignRequest.method()))) { // tell the client this can be cached diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java index 1e44e53318..c35c363caa 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java @@ -20,12 +20,14 @@ package org.apache.iceberg.aws.s3.signer; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.stream.Collectors; import org.apache.iceberg.aws.s3.MinioContainer; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.eclipse.jetty.server.Server; @@ -56,8 +58,11 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.UploadPartRequest; @@ -148,7 +153,15 @@ public class TestS3RestSigner { } private static Server initHttpServer() throws Exception { - S3SignerServlet servlet = new S3SignerServlet(S3ObjectMapper.mapper()); + S3SignerServlet.SignRequestValidator deleteObjectsHasBody = + new S3SignerServlet.SignRequestValidator( + (s3SignRequest) -> + "post".equalsIgnoreCase(s3SignRequest.method()) + && s3SignRequest.uri().getQuery().contains("delete"), + (s3SignRequest) -> s3SignRequest.body() != null && !s3SignRequest.body().isEmpty(), + "Sign request for delete objects should have a request body"); + S3SignerServlet servlet = + new S3SignerServlet(S3ObjectMapper.mapper(), ImmutableList.of(deleteObjectsHasBody)); ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); servletContext.setContextPath("/"); @@ -177,6 +190,22 @@ public class TestS3RestSigner { PutObjectRequest.builder().bucket(BUCKET).key("some/key").build(), Paths.get("/etc/hosts")); } + @Test + public void validateDeleteObjects() { + Path sourcePath = Paths.get("/etc/hosts"); + s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key1").build(), sourcePath); + s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key2").build(), sourcePath); + + Delete objectsToDelete = + Delete.builder() + .objects( + ObjectIdentifier.builder().key("some/key1").build(), + ObjectIdentifier.builder().key("some/key2").build()) + .build(); + + s3.deleteObjects(DeleteObjectsRequest.builder().bucket(BUCKET).delete(objectsToDelete).build()); + } + @Test public void validateListPrefix() { s3.listObjectsV2(ListObjectsV2Request.builder().bucket(BUCKET).prefix("some/prefix/").build());
