This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 50863d7c02 Add support for SigV4 request signing to REST Catalog 
requests (#6951)
50863d7c02 is described below

commit 50863d7c028ea36c4cfb5d857f1324f7f978e579
Author: Daniel Weeks <[email protected]>
AuthorDate: Mon Mar 6 15:22:57 2023 -0800

    Add support for SigV4 request signing to REST Catalog requests (#6951)
---
 .../java/org/apache/iceberg/aws/AwsProperties.java |  68 +++++++++
 .../org/apache/iceberg/aws/RESTSigV4Signer.java    | 157 +++++++++++++++++++++
 .../apache/iceberg/aws/TestRESTSigV4Signer.java    | 146 +++++++++++++++++++
 build.gradle                                       |   4 +
 .../java/org/apache/iceberg/rest/HTTPClient.java   |  95 +++++++++++--
 .../java/org/apache/iceberg/rest/RESTCatalog.java  |   2 +-
 .../apache/iceberg/rest/RESTSessionCatalog.java    |   2 +-
 .../org/apache/iceberg/rest/TestHTTPClient.java    |  29 ++++
 8 files changed, 493 insertions(+), 10 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java 
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 0363942b6e..4b4806b86d 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -48,6 +48,8 @@ import 
software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
 import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.signer.Signer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
 import software.amazon.awssdk.services.glue.GlueClientBuilder;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
@@ -628,6 +630,41 @@ public class AwsProperties implements Serializable {
    */
   public static final String LAKE_FORMATION_DB_NAME = "lakeformation.db-name";
 
+  /** Region to be used by the SigV4 protocol for signing requests. */
+  public static final String REST_SIGNER_REGION = "rest.signing-region";
+
+  /** The service name to be used by the SigV4 protocol for signing requests. 
*/
+  public static final String REST_SIGNING_NAME = "rest.signing-name";
+
+  /** The default service name (API Gateway and lambda) used during SigV4 
signing. */
+  public static final String REST_SIGNING_NAME_DEFAULT = "execute-api";
+
+  /**
+   * Configure the static access key ID used for SigV4 signing.
+   *
+   * <p>When set, the default client factory will use the basic or session 
credentials provided
+   * instead of reading the default credential chain to create S3 access 
credentials. If {@link
+   * #REST_SESSION_TOKEN} is set, session credential is used, otherwise basic 
credential is used.
+   */
+  public static final String REST_ACCESS_KEY_ID = "rest.access-key-id";
+
+  /**
+   * Configure the static secret access key used for SigV4 signing.
+   *
+   * <p>When set, the default client factory will use the basic or session 
credentials provided
+   * instead of reading the default credential chain to create S3 access 
credentials. If {@link
+   * #REST_SESSION_TOKEN} is set, session credential is used, otherwise basic 
credential is used.
+   */
+  public static final String REST_SECRET_ACCESS_KEY = "rest.secret-access-key";
+
+  /**
+   * Configure the static session token used for SigV4.
+   *
+   * <p>When set, the default client factory will use the session credentials 
provided instead of
+   * reading the default credential chain to create access credentials.
+   */
+  public static final String REST_SESSION_TOKEN = "rest.session-token";
+
   private static final String HTTP_CLIENT_PREFIX = "http-client.";
   private String httpClientType;
   private final Map<String, String> httpClientProperties;
@@ -678,6 +715,12 @@ public class AwsProperties implements Serializable {
   private final String s3SignerImpl;
   private final Map<String, String> allProperties;
 
+  private String restSigningRegion;
+  private String restSigningName;
+  private String restAccessKeyId;
+  private String restSecretAccessKey;
+  private String restSessionToken;
+
   public AwsProperties() {
     this.httpClientType = HTTP_CLIENT_TYPE_DEFAULT;
     this.httpClientProperties = Collections.emptyMap();
@@ -728,6 +771,8 @@ public class AwsProperties implements Serializable {
     this.s3SignerImpl = null;
     this.allProperties = Maps.newHashMap();
 
+    this.restSigningName = REST_SIGNING_NAME_DEFAULT;
+
     ValidationException.check(
         s3KeyIdAccessKeyBothConfigured(),
         "S3 client access key ID and secret access key must be set at the same 
time");
@@ -856,6 +901,12 @@ public class AwsProperties implements Serializable {
     this.s3SignerImpl = properties.get(S3_SIGNER_IMPL);
     this.allProperties = SerializableMap.copyOf(properties);
 
+    this.restSigningRegion = properties.get(REST_SIGNER_REGION);
+    this.restSigningName = properties.getOrDefault(REST_SIGNING_NAME, 
REST_SIGNING_NAME_DEFAULT);
+    this.restAccessKeyId = properties.get(REST_ACCESS_KEY_ID);
+    this.restSecretAccessKey = properties.get(REST_SECRET_ACCESS_KEY);
+    this.restSessionToken = properties.get(REST_SESSION_TOKEN);
+
     ValidationException.check(
         s3KeyIdAccessKeyBothConfigured(),
         "S3 client access key ID and secret access key must be set at the same 
time");
@@ -1227,6 +1278,23 @@ public class AwsProperties implements Serializable {
     configureEndpoint(builder, dynamoDbEndpoint);
   }
 
+  public Region restSigningRegion() {
+    if (restSigningRegion == null) {
+      this.restSigningRegion = 
DefaultAwsRegionProviderChain.builder().build().getRegion().id();
+    }
+
+    return Region.of(restSigningRegion);
+  }
+
+  public String restSigningName() {
+    return restSigningName;
+  }
+
+  public AwsCredentialsProvider restCredentialsProvider() {
+    return credentialsProvider(
+        this.restAccessKeyId, this.restSecretAccessKey, this.restSessionToken);
+  }
+
   private Set<Tag> toS3Tags(Map<String, String> properties, String prefix) {
     return PropertyUtil.propertiesWithPrefix(properties, 
prefix).entrySet().stream()
         .map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build())
diff --git a/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java 
b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java
new file mode 100644
index 0000000000..c1c8584163
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.iceberg.exceptions.RESTException;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+import software.amazon.awssdk.auth.signer.internal.SignerConstant;
+import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
+import software.amazon.awssdk.auth.signer.params.SignerChecksumParams;
+import software.amazon.awssdk.core.checksums.Algorithm;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.regions.Region;
+
+/**
+ * Provides a request interceptor for use with the HTTPClient that calculates 
the required signature
+ * for the SigV4 protocol and adds the necessary headers for all requests 
created by the client.
+ *
+ * <p>See <a
+ * 
href="https://docs.aws.amazon.com/general/latest/gr/signing-aws-api-requests.html";>Signing
 AWS
+ * API requests</a> for details about the protocol.
+ */
+public class RESTSigV4Signer implements HttpRequestInterceptor {
+  static final String EMPTY_BODY_SHA256 =
+      "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+  static final String RELOCATED_HEADER_PREFIX = "Original-";
+
+  private final Aws4Signer signer = Aws4Signer.create();
+  private AwsCredentialsProvider credentialsProvider;
+
+  private String signingName;
+  private Region signingRegion;
+
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+
+    this.signingRegion = awsProperties.restSigningRegion();
+    this.signingName = awsProperties.restSigningName();
+    this.credentialsProvider = awsProperties.restCredentialsProvider();
+  }
+
+  @Override
+  public void process(HttpRequest request, EntityDetails entity, HttpContext 
context) {
+    URI requestUri;
+
+    try {
+      requestUri = request.getUri();
+    } catch (URISyntaxException e) {
+      throw new RESTException(e, "Invalid uri for request: %s", request);
+    }
+
+    Aws4SignerParams params =
+        Aws4SignerParams.builder()
+            .signingName(signingName)
+            .signingRegion(signingRegion)
+            .awsCredentials(credentialsProvider.resolveCredentials())
+            .checksumParams(
+                SignerChecksumParams.builder()
+                    .algorithm(Algorithm.SHA256)
+                    .isStreamingRequest(false)
+                    .checksumHeaderName(SignerConstant.X_AMZ_CONTENT_SHA256)
+                    .build())
+            .build();
+
+    SdkHttpFullRequest.Builder sdkRequestBuilder = 
SdkHttpFullRequest.builder();
+
+    sdkRequestBuilder
+        .method(SdkHttpMethod.fromValue(request.getMethod()))
+        .protocol(request.getScheme())
+        .uri(requestUri)
+        .headers(convertHeaders(request.getHeaders()));
+
+    if (entity == null) {
+      // This is a workaround for the signer implementation incorrectly 
producing
+      // an invalid content checksum for empty body requests.
+      sdkRequestBuilder.putHeader(SignerConstant.X_AMZ_CONTENT_SHA256, 
EMPTY_BODY_SHA256);
+    } else if (entity instanceof StringEntity) {
+      sdkRequestBuilder.contentStreamProvider(
+          () -> {
+            try {
+              return ((StringEntity) entity).getContent();
+            } catch (IOException e) {
+              throw new UncheckedIOException(e);
+            }
+          });
+    } else {
+      throw new UnsupportedOperationException("Unsupported entity type: " + 
entity.getClass());
+    }
+
+    SdkHttpFullRequest signedSdkRequest = 
signer.sign(sdkRequestBuilder.build(), params);
+    updateRequestHeaders(request, signedSdkRequest.headers());
+  }
+
+  private Map<String, List<String>> convertHeaders(Header[] headers) {
+    return Arrays.stream(headers)
+        .collect(
+            Collectors.groupingBy(
+                // Relocate Authorization header as SigV4 takes precedence
+                header ->
+                    HttpHeaders.AUTHORIZATION.equals(header.getName())
+                        ? RELOCATED_HEADER_PREFIX + header.getName()
+                        : header.getName(),
+                Collectors.mapping(Header::getValue, Collectors.toList())));
+  }
+
+  private void updateRequestHeaders(HttpRequest request, Map<String, 
List<String>> headers) {
+    headers.forEach(
+        (name, values) -> {
+          if (request.containsHeader(name)) {
+            Header[] original = request.getHeaders(name);
+            request.removeHeaders(name);
+            Arrays.asList(original)
+                .forEach(
+                    header -> {
+                      // Relocate headers if there is a conflict with signed 
headers
+                      if (!values.contains(header.getValue())) {
+                        request.addHeader(RELOCATED_HEADER_PREFIX + name, 
header.getValue());
+                      }
+                    });
+          }
+
+          values.forEach(value -> request.setHeader(name, value));
+        });
+  }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4Signer.java 
b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4Signer.java
new file mode 100644
index 0000000000..188d488db2
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4Signer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.Header;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+import software.amazon.awssdk.auth.signer.internal.SignerConstant;
+
+public class TestRESTSigV4Signer {
+  private static ClientAndServer mockServer;
+  private static HTTPClient client;
+
+  @BeforeClass
+  public static void beforeClass() {
+    mockServer = ClientAndServer.startClientAndServer();
+
+    Map<String, String> properties =
+        ImmutableMap.of(
+            "rest.sigv4-enabled",
+            "true",
+            // CI environment doesn't have credentials, but a value must be 
set for signing
+            AwsProperties.REST_SIGNER_REGION,
+            "us-west-2",
+            AwsProperties.REST_ACCESS_KEY_ID,
+            "id",
+            AwsProperties.REST_SECRET_ACCESS_KEY,
+            "secret");
+    client =
+        HTTPClient.builder(properties)
+            .uri("http://localhost:"; + mockServer.getLocalPort())
+            .withHeader(HttpHeaders.AUTHORIZATION, "Bearer existing_token")
+            .build();
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    mockServer.stop();
+    client.close();
+  }
+
+  @Before
+  public void before() {
+    mockServer.reset();
+  }
+
+  @Test
+  public void signRequestWithoutBody() {
+    HttpRequest request =
+        HttpRequest.request()
+            .withMethod("GET")
+            .withPath("/v1/config")
+            // Require SigV4 Authorization
+            .withHeader(Header.header(HttpHeaders.AUTHORIZATION, 
"AWS4-HMAC-SHA256.*"))
+            // Require that conflicting auth header is relocated
+            .withHeader(
+                Header.header(RESTSigV4Signer.RELOCATED_HEADER_PREFIX + 
HttpHeaders.AUTHORIZATION))
+            // Require the empty body checksum
+            .withHeader(
+                Header.header(
+                    SignerConstant.X_AMZ_CONTENT_SHA256, 
RESTSigV4Signer.EMPTY_BODY_SHA256));
+
+    mockServer
+        .when(request)
+        
.respond(HttpResponse.response().withStatusCode(HttpStatus.SC_OK).withBody("{}"));
+
+    ConfigResponse response =
+        client.get("v1/config", ConfigResponse.class, ImmutableMap.of(), e -> 
{});
+
+    mockServer.verify(request, VerificationTimes.exactly(1));
+    Assertions.assertThat(response).isNotNull();
+  }
+
+  @Test
+  public void signRequestWithBody() {
+    HttpRequest request =
+        HttpRequest.request()
+            .withMethod("POST")
+            .withPath("/v1/oauth/token")
+            // Require SigV4 Authorization
+            .withHeader(Header.header(HttpHeaders.AUTHORIZATION, 
"AWS4-HMAC-SHA256.*"))
+            // Require that conflicting auth header is relocated
+            .withHeader(
+                Header.header(RESTSigV4Signer.RELOCATED_HEADER_PREFIX + 
HttpHeaders.AUTHORIZATION))
+            // Require a body checksum is set
+            .withHeader(Header.header(SignerConstant.X_AMZ_CONTENT_SHA256));
+
+    mockServer
+        .when(request)
+        .respond(
+            HttpResponse.response()
+                .withStatusCode(HttpStatus.SC_OK)
+                .withBody(
+                    OAuth2Util.tokenResponseToJson(
+                        OAuthTokenResponse.builder()
+                            .withToken("fake_token")
+                            .withTokenType("bearer")
+                            .withIssuedTokenType("n/a")
+                            .build())));
+
+    Map<String, String> formData = Maps.newHashMap();
+    formData.put("client_id", "asdfasd");
+    formData.put("client_secret", "asdfasdf");
+    formData.put("scope", "catalog");
+
+    OAuthTokenResponse response =
+        client.postForm(
+            "v1/oauth/token", formData, OAuthTokenResponse.class, 
ImmutableMap.of(), e -> {});
+
+    mockServer.verify(request, VerificationTimes.exactly(1));
+    Assertions.assertThat(response).isNotNull();
+  }
+}
diff --git a/build.gradle b/build.gradle
index 0506825b3b..842ffa5be5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,6 +457,8 @@ project(':iceberg-aws') {
       exclude group: 'com.google.code.gson', module: 'gson'
     }
 
+    compileOnly 'org.apache.httpcomponents.client5:httpclient5'
+
     testImplementation 'software.amazon.awssdk:iam'
     testImplementation 'software.amazon.awssdk:s3control'
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
@@ -469,6 +471,8 @@ project(':iceberg-aws') {
     testImplementation "org.xerial:sqlite-jdbc"
     testImplementation "org.testcontainers:testcontainers"
     testImplementation "org.apache.httpcomponents.client5:httpclient5"
+    testImplementation 'org.mock-server:mockserver-netty'
+    testImplementation 'org.mock-server:mockserver-client-java'
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java 
b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
index f9e42486ca..a95c7dd1b5 100644
--- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -26,29 +26,36 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.hc.client5.http.classic.methods.HttpUriRequest;
 import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
 import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
 import org.apache.hc.client5.http.impl.classic.HttpClients;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.Method;
 import org.apache.hc.core5.http.ParseException;
 import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog;
 import org.apache.hc.core5.http.io.entity.EntityUtils;
 import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.http.message.BasicHeader;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.net.URIBuilder;
 import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.exceptions.RESTException;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +63,9 @@ import org.slf4j.LoggerFactory;
 public class HTTPClient implements RESTClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class);
+  private static final String SIGV4_ENABLED = "rest.sigv4-enabled";
+  private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL =
+      "org.apache.iceberg.aws.RESTSigV4Signer";
   @VisibleForTesting static final String CLIENT_VERSION_HEADER = 
"X-Client-Version";
 
   @VisibleForTesting
@@ -64,13 +74,29 @@ public class HTTPClient implements RESTClient {
   private final String uri;
   private final CloseableHttpClient httpClient;
   private final ObjectMapper mapper;
-  private final Map<String, String> baseHeaders;
 
-  private HTTPClient(String uri, Map<String, String> baseHeaders, ObjectMapper 
objectMapper) {
+  private HTTPClient(
+      String uri,
+      Map<String, String> baseHeaders,
+      ObjectMapper objectMapper,
+      HttpRequestInterceptor requestInterceptor) {
     this.uri = uri;
-    this.httpClient = HttpClients.createDefault();
-    this.baseHeaders = baseHeaders != null ? baseHeaders : ImmutableMap.of();
     this.mapper = objectMapper;
+
+    HttpClientBuilder clientBuilder = HttpClients.custom();
+
+    if (baseHeaders != null) {
+      clientBuilder.setDefaultHeaders(
+          baseHeaders.entrySet().stream()
+              .map(e -> new BasicHeader(e.getKey(), e.getValue()))
+              .collect(Collectors.toList()));
+    }
+
+    if (requestInterceptor != null) {
+      clientBuilder.addRequestInterceptorLast(requestInterceptor);
+    }
+
+    this.httpClient = clientBuilder.build();
   }
 
   private static String extractResponseBodyAsString(CloseableHttpResponse 
response) {
@@ -352,7 +378,6 @@ public class HTTPClient implements RESTClient {
     // Many systems require that content type is set regardless and will fail, 
even on an empty
     // bodied request.
     request.setHeader(HttpHeaders.CONTENT_TYPE, bodyMimeType);
-    baseHeaders.forEach(request::setHeader);
     requestHeaders.forEach(request::setHeader);
   }
 
@@ -361,16 +386,63 @@ public class HTTPClient implements RESTClient {
     httpClient.close(CloseMode.GRACEFUL);
   }
 
+  @VisibleForTesting
+  static HttpRequestInterceptor loadInterceptorDynamically(
+      String impl, Map<String, String> properties) {
+    HttpRequestInterceptor instance;
+
+    DynConstructors.Ctor<HttpRequestInterceptor> ctor;
+    try {
+      ctor =
+          DynConstructors.builder(HttpRequestInterceptor.class)
+              .loader(HTTPClient.class.getClassLoader())
+              .impl(impl)
+              .buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot initialize RequestInterceptor, missing no-arg 
constructor: %s", impl),
+          e);
+    }
+
+    try {
+      instance = ctor.newInstance();
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize, %s does not implement 
RequestInterceptor", impl), e);
+    }
+
+    DynMethods.builder("initialize")
+        .hiddenImpl(impl, Map.class)
+        .orNoop()
+        .build(instance)
+        .invoke(properties);
+
+    return instance;
+  }
+
+  /**
+   * @return http client builder
+   * @deprecated will be removed in 1.3.0; use {@link HTTPClient#builder(Map)}
+   */
+  @Deprecated
   public static Builder builder() {
-    return new Builder();
+    return new Builder(ImmutableMap.of());
+  }
+
+  public static Builder builder(Map<String, String> properties) {
+    return new Builder(properties);
   }
 
   public static class Builder {
+    private final Map<String, String> properties;
     private final Map<String, String> baseHeaders = Maps.newHashMap();
     private String uri;
     private ObjectMapper mapper = RESTObjectMapper.mapper();
 
-    private Builder() {}
+    private Builder(Map<String, String> properties) {
+      this.properties = properties;
+    }
 
     public Builder uri(String baseUri) {
       Preconditions.checkNotNull(baseUri, "Invalid uri for http client: null");
@@ -396,7 +468,14 @@ public class HTTPClient implements RESTClient {
     public HTTPClient build() {
       withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion());
       withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, 
IcebergBuild.gitCommitShortId());
-      return new HTTPClient(uri, baseHeaders, mapper);
+
+      HttpRequestInterceptor interceptor = null;
+
+      if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) {
+        interceptor = 
loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties);
+      }
+
+      return new HTTPClient(uri, baseHeaders, mapper, interceptor);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index 83a08b1a2c..bec9feccfd 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -49,7 +49,7 @@ public class RESTCatalog
   public RESTCatalog() {
     this(
         SessionCatalog.SessionContext.createEmpty(),
-        config -> 
HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
+        config -> 
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
   }
 
   public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index e214759802..80f31aba88 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -123,7 +123,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
   }
 
   public RESTSessionCatalog() {
-    this(config -> 
HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
+    this(config -> 
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
   }
 
   RESTSessionCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java 
b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
index 6445fd237e..1bdf880fe8 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
@@ -35,6 +35,10 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.IcebergBuild;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -118,6 +122,18 @@ public class TestHTTPClient {
     testHttpMethodOnFailure(HttpMethod.HEAD);
   }
 
+  @Test
+  public void testDynamicHttpRequestInterceptorLoading() {
+    Map<String, String> properties = ImmutableMap.of("key", "val");
+
+    HttpRequestInterceptor interceptor =
+        HTTPClient.loadInterceptorDynamically(
+            TestHttpRequestInterceptor.class.getName(), properties);
+
+    assertThat(interceptor).isInstanceOf(TestHttpRequestInterceptor.class);
+    assertThat(((TestHttpRequestInterceptor) 
interceptor).properties).isEqualTo(properties);
+  }
+
   public static void testHttpMethodOnSuccess(HttpMethod method) throws 
JsonProcessingException {
     Item body = new Item(0L, "hank");
     int statusCode = 200;
@@ -265,4 +281,17 @@ public class TestHTTPClient {
       return Objects.equals(id, item.id) && Objects.equals(data, item.data);
     }
   }
+
+  public static class TestHttpRequestInterceptor implements 
HttpRequestInterceptor {
+    private Map<String, String> properties;
+
+    public void initialize(Map<String, String> props) {
+      this.properties = props;
+    }
+
+    @Override
+    public void process(
+        org.apache.hc.core5.http.HttpRequest request, EntityDetails entity, 
HttpContext context)
+        throws HttpException, IOException {}
+  }
 }

Reply via email to