This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 258c18419b AWS: Update S3V4RestSignerClient to send body for
DeleteObjects requests (#8365)
258c18419b is described below
commit 258c18419b4cdc033f710ce1f3de4ebbb6e7bf23
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri Aug 25 10:55:52 2023 -0700
AWS: Update S3V4RestSignerClient to send body for DeleteObjects requests
(#8365)
---
.../aws/s3/signer/S3V4RestSignerClient.java | 26 ++++++++++++++++
.../iceberg/aws/s3/signer/S3SignerServlet.java | 36 ++++++++++++++++++++++
.../iceberg/aws/s3/signer/TestS3RestSigner.java | 31 ++++++++++++++++++-
3 files changed, 92 insertions(+), 1 deletion(-)
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
index a334221a25..99f1588f0c 100644
---
a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
+++
b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.aws.s3.signer;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
@@ -55,6 +57,8 @@ import
software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams;
import software.amazon.awssdk.core.checksums.SdkChecksum;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.utils.IoUtils;
@Value.Immutable
public abstract class S3V4RestSignerClient
@@ -286,6 +290,7 @@ public abstract class S3V4RestSignerClient
.uri(request.getUri())
.headers(request.headers())
.properties(requestPropertiesSupplier().get())
+ .body(bodyAsString(request))
.build();
Key cacheKey = Key.from(remoteSigningRequest);
@@ -328,6 +333,27 @@ public abstract class S3V4RestSignerClient
return mutableRequest.build();
}
+ /**
+ * Only add body for DeleteObjectsRequest. Refer to
+ *
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestSyntax
+ */
+ private String bodyAsString(SdkHttpFullRequest request) {
+ if (isDeleteObjectsRequest(request) &&
request.contentStreamProvider().isPresent()) {
+ try (InputStream is = request.contentStreamProvider().get().newStream())
{
+ return IoUtils.toUtf8String(is);
+ } catch (IOException e) {
+ LOG.debug("Failed to determine body for S3 sign request", e);
+ }
+ }
+
+ return null;
+ }
+
+ private boolean isDeleteObjectsRequest(SdkHttpFullRequest request) {
+ return request.method() == SdkHttpMethod.POST
+ && request.rawQueryParameters().containsKey("delete");
+ }
+
private void reconstructHeaders(
Map<String, List<String>> signedAndUnsignedHeaders,
SdkHttpFullRequest.Builder mutableRequest) {
diff --git
a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
index 6240efa2ad..bc9fb44988 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws.s3.signer;
import static java.lang.String.format;
import static org.apache.iceberg.rest.RESTCatalogAdapter.castRequest;
import static org.apache.iceberg.rest.RESTCatalogAdapter.castResponse;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.InputStreamReader;
@@ -32,6 +33,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.servlet.http.HttpServlet;
@@ -41,6 +43,7 @@ import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.io.CharStreams;
@@ -78,10 +81,42 @@ public class S3SignerServlet extends HttpServlet {
ImmutableMap.of(HttpHeaders.CONTENT_TYPE,
ContentType.APPLICATION_JSON.getMimeType());
private final ObjectMapper mapper;
+ private List<SignRequestValidator> s3SignRequestValidators =
Lists.newArrayList();
+
+ /**
+ * SignRequestValidator is a wrapper class used for validating the contents
of the S3SignRequest
+ * and thus verifying the behavior of the client during testing.
+ */
+ public static class SignRequestValidator {
+ private final Predicate<S3SignRequest> requestMatcher;
+ private final Predicate<S3SignRequest> requestExpectation;
+ private final String assertMessage;
+
+ public SignRequestValidator(
+ Predicate<S3SignRequest> requestExpectation,
+ Predicate<S3SignRequest> requestMatcher,
+ String assertMessage) {
+ this.requestExpectation = requestExpectation;
+ this.requestMatcher = requestMatcher;
+ this.assertMessage = assertMessage;
+ }
+
+ void validateRequest(S3SignRequest request) {
+ if (requestMatcher.test(request)) {
+
assertThat(requestExpectation.test(request)).as(assertMessage).isTrue();
+ }
+ }
+ }
+
public S3SignerServlet(ObjectMapper mapper) {
this.mapper = mapper;
}
+ public S3SignerServlet(ObjectMapper mapper, List<SignRequestValidator>
s3SignRequestValidators) {
+ this.mapper = mapper;
+ this.s3SignRequestValidators = s3SignRequestValidators;
+ }
+
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse
response) {
execute(request, response);
@@ -188,6 +223,7 @@ public class S3SignerServlet extends HttpServlet {
S3SignRequest s3SignRequest =
castRequest(
S3SignRequest.class, mapper.readValue(request.getReader(),
S3SignRequest.class));
+ s3SignRequestValidators.forEach(validator ->
validator.validateRequest(s3SignRequest));
S3SignResponse s3SignResponse = signRequest(s3SignRequest);
if
(CACHEABLE_METHODS.contains(SdkHttpMethod.fromValue(s3SignRequest.method()))) {
// tell the client this can be cached
diff --git
a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
index 1e44e53318..67a5d423b8 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
@@ -20,12 +20,14 @@ package org.apache.iceberg.aws.s3.signer;
import static org.assertj.core.api.Assertions.assertThat;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.iceberg.aws.s3.MinioContainer;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.eclipse.jetty.server.Server;
@@ -56,8 +58,11 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
@@ -148,7 +153,15 @@ public class TestS3RestSigner {
}
private static Server initHttpServer() throws Exception {
- S3SignerServlet servlet = new S3SignerServlet(S3ObjectMapper.mapper());
+ S3SignerServlet.SignRequestValidator deleteObjectsWithBody =
+ new S3SignerServlet.SignRequestValidator(
+ (s3SignRequest) ->
+ "post".equalsIgnoreCase(s3SignRequest.method())
+ && s3SignRequest.uri().getQuery().contains("delete"),
+ (s3SignRequest) -> s3SignRequest.body() != null &&
!s3SignRequest.body().isEmpty(),
+ "Sign request for delete objects should have a request body");
+ S3SignerServlet servlet =
+ new S3SignerServlet(S3ObjectMapper.mapper(),
ImmutableList.of(deleteObjectsWithBody));
ServletContextHandler servletContext =
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContext.setContextPath("/");
@@ -177,6 +190,22 @@ public class TestS3RestSigner {
PutObjectRequest.builder().bucket(BUCKET).key("some/key").build(),
Paths.get("/etc/hosts"));
}
+ @Test
+ public void validateDeleteObjects() {
+ Path sourcePath = Paths.get("/etc/hosts");
+
s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key1").build(),
sourcePath);
+
s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key2").build(),
sourcePath);
+
+ Delete objectsToDelete =
+ Delete.builder()
+ .objects(
+ ObjectIdentifier.builder().key("some/key1").build(),
+ ObjectIdentifier.builder().key("some/key2").build())
+ .build();
+
+
s3.deleteObjects(DeleteObjectsRequest.builder().bucket(BUCKET).delete(objectsToDelete).build());
+ }
+
@Test
public void validateListPrefix() {
s3.listObjectsV2(ListObjectsV2Request.builder().bucket(BUCKET).prefix("some/prefix/").build());