This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch wip-s3sign-body
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 59cf16f1a9d3041306f3bc556bd33ccd122a478c
Author: amogh-jahagirdar <[email protected]>
AuthorDate: Mon Aug 21 16:35:44 2023 -0700

    AWS: Update S3V4RestSignerClient to send body for DeleteObjects requests
---
 .../aws/s3/signer/S3V4RestSignerClient.java        | 27 ++++++++++++++++
 .../iceberg/aws/s3/signer/S3SignerServlet.java     | 36 ++++++++++++++++++++++
 .../iceberg/aws/s3/signer/TestS3RestSigner.java    | 31 ++++++++++++++++++-
 3 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
index a334221a25..f257f5e663 100644
--- 
a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
+++ 
b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
@@ -21,6 +21,9 @@ package org.apache.iceberg.aws.s3.signer;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
 import java.net.URI;
 import java.time.Duration;
 import java.util.Collections;
@@ -55,6 +58,8 @@ import 
software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams;
 import software.amazon.awssdk.core.checksums.SdkChecksum;
 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
 import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.utils.IoUtils;
 
 @Value.Immutable
 public abstract class S3V4RestSignerClient
@@ -286,6 +291,7 @@ public abstract class S3V4RestSignerClient
             .uri(request.getUri())
             .headers(request.headers())
             .properties(requestPropertiesSupplier().get())
+            .body(body(request))
             .build();
 
     Key cacheKey = Key.from(remoteSigningRequest);
@@ -328,6 +334,27 @@ public abstract class S3V4RestSignerClient
     return mutableRequest.build();
   }
 
+  private String body(SdkHttpFullRequest request) {
+    if (shouldAddBody(request) && request.contentStreamProvider().isPresent()) 
{
+      try (InputStream is = request.contentStreamProvider().get().newStream()) 
{
+        return IoUtils.toUtf8String(is);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to set body for S3 sign 
request", e);
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Only add body for DeleteObjectsRequest. Refer to
+   * 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestSyntax
+   */
+  private boolean shouldAddBody(SdkHttpFullRequest request) {
+    return request.method() == SdkHttpMethod.POST
+        && request.rawQueryParameters().containsKey("delete");
+  }
+
   private void reconstructHeaders(
       Map<String, List<String>> signedAndUnsignedHeaders,
       SdkHttpFullRequest.Builder mutableRequest) {
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
index 6240efa2ad..7fc8097e25 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
@@ -32,11 +32,13 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.compress.utils.Lists;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.HttpHeaders;
 import org.apache.iceberg.exceptions.RESTException;
@@ -48,6 +50,7 @@ import org.apache.iceberg.rest.RESTUtil;
 import org.apache.iceberg.rest.ResourcePaths;
 import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
@@ -78,10 +81,42 @@ public class S3SignerServlet extends HttpServlet {
       ImmutableMap.of(HttpHeaders.CONTENT_TYPE, 
ContentType.APPLICATION_JSON.getMimeType());
   private final ObjectMapper mapper;
 
+  private List<SignRequestValidator> s3SignRequestValidators = 
Lists.newArrayList();
+
+  /**
+   * SignRequestValidator is a wrapper class used for validating the actual 
contents of the
+   * S3SignRequest and thus verifying the contents the signer client puts in 
the request
+   */
+  public static class SignRequestValidator {
+    private final Predicate<S3SignRequest> verifyExpectation;
+    private final Predicate<S3SignRequest> signRequestExpectation;
+    private final String assertMessage;
+
+    public SignRequestValidator(
+        Predicate<S3SignRequest> requestValidatorPredicate,
+        Predicate<S3SignRequest> verifyExpectation,
+        String assertMessage) {
+      this.signRequestExpectation = requestValidatorPredicate;
+      this.verifyExpectation = verifyExpectation;
+      this.assertMessage = assertMessage;
+    }
+
+    void validateRequest(S3SignRequest request) {
+      if (verifyExpectation.test(request)) {
+        Assert.assertTrue(assertMessage, signRequestExpectation.test(request));
+      }
+    }
+  }
+
   public S3SignerServlet(ObjectMapper mapper) {
     this.mapper = mapper;
   }
 
+  public S3SignerServlet(ObjectMapper mapper, List<SignRequestValidator> 
s3SignRequestValidators) {
+    this.mapper = mapper;
+    this.s3SignRequestValidators = s3SignRequestValidators;
+  }
+
   @Override
   protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
     execute(request, response);
@@ -188,6 +223,7 @@ public class S3SignerServlet extends HttpServlet {
         S3SignRequest s3SignRequest =
             castRequest(
                 S3SignRequest.class, mapper.readValue(request.getReader(), 
S3SignRequest.class));
+        s3SignRequestValidators.forEach(validator -> 
validator.validateRequest(s3SignRequest));
         S3SignResponse s3SignResponse = signRequest(s3SignRequest);
         if 
(CACHEABLE_METHODS.contains(SdkHttpMethod.fromValue(s3SignRequest.method()))) {
           // tell the client this can be cached
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
index 1e44e53318..c35c363caa 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
@@ -20,12 +20,14 @@ package org.apache.iceberg.aws.s3.signer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.stream.Collectors;
 import org.apache.iceberg.aws.s3.MinioContainer;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.eclipse.jetty.server.Server;
@@ -56,8 +58,11 @@ import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
 import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 
@@ -148,7 +153,15 @@ public class TestS3RestSigner {
   }
 
   private static Server initHttpServer() throws Exception {
-    S3SignerServlet servlet = new S3SignerServlet(S3ObjectMapper.mapper());
+    S3SignerServlet.SignRequestValidator deleteObjectsHasBody =
+        new S3SignerServlet.SignRequestValidator(
+            (s3SignRequest) ->
+                "post".equalsIgnoreCase(s3SignRequest.method())
+                    && s3SignRequest.uri().getQuery().contains("delete"),
+            (s3SignRequest) -> s3SignRequest.body() != null && 
!s3SignRequest.body().isEmpty(),
+            "Sign request for delete objects should have a request body");
+    S3SignerServlet servlet =
+        new S3SignerServlet(S3ObjectMapper.mapper(), 
ImmutableList.of(deleteObjectsHasBody));
     ServletContextHandler servletContext =
         new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
     servletContext.setContextPath("/");
@@ -177,6 +190,22 @@ public class TestS3RestSigner {
         PutObjectRequest.builder().bucket(BUCKET).key("some/key").build(), 
Paths.get("/etc/hosts"));
   }
 
+  @Test
+  public void validateDeleteObjects() {
+    Path sourcePath = Paths.get("/etc/hosts");
+    
s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key1").build(),
 sourcePath);
+    
s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key2").build(),
 sourcePath);
+
+    Delete objectsToDelete =
+        Delete.builder()
+            .objects(
+                ObjectIdentifier.builder().key("some/key1").build(),
+                ObjectIdentifier.builder().key("some/key2").build())
+            .build();
+
+    
s3.deleteObjects(DeleteObjectsRequest.builder().bucket(BUCKET).delete(objectsToDelete).build());
+  }
+
   @Test
   public void validateListPrefix() {
     
s3.listObjectsV2(ListObjectsV2Request.builder().bucket(BUCKET).prefix("some/prefix/").build());

Reply via email to