This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 50863d7c02 Add support for SigV4 request signing to REST Catalog
requests (#6951)
50863d7c02 is described below
commit 50863d7c028ea36c4cfb5d857f1324f7f978e579
Author: Daniel Weeks <[email protected]>
AuthorDate: Mon Mar 6 15:22:57 2023 -0800
Add support for SigV4 request signing to REST Catalog requests (#6951)
---
.../java/org/apache/iceberg/aws/AwsProperties.java | 68 +++++++++
.../org/apache/iceberg/aws/RESTSigV4Signer.java | 157 +++++++++++++++++++++
.../apache/iceberg/aws/TestRESTSigV4Signer.java | 146 +++++++++++++++++++
build.gradle | 4 +
.../java/org/apache/iceberg/rest/HTTPClient.java | 95 +++++++++++--
.../java/org/apache/iceberg/rest/RESTCatalog.java | 2 +-
.../apache/iceberg/rest/RESTSessionCatalog.java | 2 +-
.../org/apache/iceberg/rest/TestHTTPClient.java | 29 ++++
8 files changed, 493 insertions(+), 10 deletions(-)
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 0363942b6e..4b4806b86d 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -48,6 +48,8 @@ import
software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.signer.Signer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
@@ -628,6 +630,41 @@ public class AwsProperties implements Serializable {
*/
public static final String LAKE_FORMATION_DB_NAME = "lakeformation.db-name";
+ /** Region to be used by the SigV4 protocol for signing requests. */
+ public static final String REST_SIGNER_REGION = "rest.signing-region";
+
+ /** The service name to be used by the SigV4 protocol for signing requests.
*/
+ public static final String REST_SIGNING_NAME = "rest.signing-name";
+
+ /** The default service name (API Gateway and lambda) used during SigV4
signing. */
+ public static final String REST_SIGNING_NAME_DEFAULT = "execute-api";
+
+ /**
+ * Configure the static access key ID used for SigV4 signing.
+ *
+ * <p>When set, the default client factory will use the basic or session
credentials provided
+ * instead of reading the default credential chain to create S3 access
credentials. If {@link
+ * #REST_SESSION_TOKEN} is set, session credential is used, otherwise basic
credential is used.
+ */
+ public static final String REST_ACCESS_KEY_ID = "rest.access-key-id";
+
+ /**
+ * Configure the static secret access key used for SigV4 signing.
+ *
+ * <p>When set, the default client factory will use the basic or session
credentials provided
+ * instead of reading the default credential chain to create S3 access
credentials. If {@link
+ * #REST_SESSION_TOKEN} is set, session credential is used, otherwise basic
credential is used.
+ */
+ public static final String REST_SECRET_ACCESS_KEY = "rest.secret-access-key";
+
+ /**
+ * Configure the static session token used for SigV4.
+ *
+ * <p>When set, the default client factory will use the session credentials
provided instead of
+ * reading the default credential chain to create access credentials.
+ */
+ public static final String REST_SESSION_TOKEN = "rest.session-token";
+
private static final String HTTP_CLIENT_PREFIX = "http-client.";
private String httpClientType;
private final Map<String, String> httpClientProperties;
@@ -678,6 +715,12 @@ public class AwsProperties implements Serializable {
private final String s3SignerImpl;
private final Map<String, String> allProperties;
+ private String restSigningRegion;
+ private String restSigningName;
+ private String restAccessKeyId;
+ private String restSecretAccessKey;
+ private String restSessionToken;
+
public AwsProperties() {
this.httpClientType = HTTP_CLIENT_TYPE_DEFAULT;
this.httpClientProperties = Collections.emptyMap();
@@ -728,6 +771,8 @@ public class AwsProperties implements Serializable {
this.s3SignerImpl = null;
this.allProperties = Maps.newHashMap();
+ this.restSigningName = REST_SIGNING_NAME_DEFAULT;
+
ValidationException.check(
s3KeyIdAccessKeyBothConfigured(),
"S3 client access key ID and secret access key must be set at the same
time");
@@ -856,6 +901,12 @@ public class AwsProperties implements Serializable {
this.s3SignerImpl = properties.get(S3_SIGNER_IMPL);
this.allProperties = SerializableMap.copyOf(properties);
+ this.restSigningRegion = properties.get(REST_SIGNER_REGION);
+ this.restSigningName = properties.getOrDefault(REST_SIGNING_NAME,
REST_SIGNING_NAME_DEFAULT);
+ this.restAccessKeyId = properties.get(REST_ACCESS_KEY_ID);
+ this.restSecretAccessKey = properties.get(REST_SECRET_ACCESS_KEY);
+ this.restSessionToken = properties.get(REST_SESSION_TOKEN);
+
ValidationException.check(
s3KeyIdAccessKeyBothConfigured(),
"S3 client access key ID and secret access key must be set at the same
time");
@@ -1227,6 +1278,23 @@ public class AwsProperties implements Serializable {
configureEndpoint(builder, dynamoDbEndpoint);
}
+ public Region restSigningRegion() {
+ if (restSigningRegion == null) {
+ this.restSigningRegion =
DefaultAwsRegionProviderChain.builder().build().getRegion().id();
+ }
+
+ return Region.of(restSigningRegion);
+ }
+
+ public String restSigningName() {
+ return restSigningName;
+ }
+
+ public AwsCredentialsProvider restCredentialsProvider() {
+ return credentialsProvider(
+ this.restAccessKeyId, this.restSecretAccessKey, this.restSessionToken);
+ }
+
private Set<Tag> toS3Tags(Map<String, String> properties, String prefix) {
return PropertyUtil.propertiesWithPrefix(properties,
prefix).entrySet().stream()
.map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build())
diff --git a/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java
b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java
new file mode 100644
index 0000000000..c1c8584163
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.iceberg.exceptions.RESTException;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+import software.amazon.awssdk.auth.signer.internal.SignerConstant;
+import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
+import software.amazon.awssdk.auth.signer.params.SignerChecksumParams;
+import software.amazon.awssdk.core.checksums.Algorithm;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.regions.Region;
+
+/**
+ * Provides a request interceptor for use with the HTTPClient that calculates
the required signature
+ * for the SigV4 protocol and adds the necessary headers for all requests
created by the client.
+ *
+ * <p>See <a
+ *
href="https://docs.aws.amazon.com/general/latest/gr/signing-aws-api-requests.html">Signing
AWS
+ * API requests</a> for details about the protocol.
+ */
+public class RESTSigV4Signer implements HttpRequestInterceptor {
+ static final String EMPTY_BODY_SHA256 =
+ "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+ static final String RELOCATED_HEADER_PREFIX = "Original-";
+
+ private final Aws4Signer signer = Aws4Signer.create();
+ private AwsCredentialsProvider credentialsProvider;
+
+ private String signingName;
+ private Region signingRegion;
+
+ public void initialize(Map<String, String> properties) {
+ AwsProperties awsProperties = new AwsProperties(properties);
+
+ this.signingRegion = awsProperties.restSigningRegion();
+ this.signingName = awsProperties.restSigningName();
+ this.credentialsProvider = awsProperties.restCredentialsProvider();
+ }
+
+ @Override
+ public void process(HttpRequest request, EntityDetails entity, HttpContext
context) {
+ URI requestUri;
+
+ try {
+ requestUri = request.getUri();
+ } catch (URISyntaxException e) {
+ throw new RESTException(e, "Invalid uri for request: %s", request);
+ }
+
+ Aws4SignerParams params =
+ Aws4SignerParams.builder()
+ .signingName(signingName)
+ .signingRegion(signingRegion)
+ .awsCredentials(credentialsProvider.resolveCredentials())
+ .checksumParams(
+ SignerChecksumParams.builder()
+ .algorithm(Algorithm.SHA256)
+ .isStreamingRequest(false)
+ .checksumHeaderName(SignerConstant.X_AMZ_CONTENT_SHA256)
+ .build())
+ .build();
+
+ SdkHttpFullRequest.Builder sdkRequestBuilder =
SdkHttpFullRequest.builder();
+
+ sdkRequestBuilder
+ .method(SdkHttpMethod.fromValue(request.getMethod()))
+ .protocol(request.getScheme())
+ .uri(requestUri)
+ .headers(convertHeaders(request.getHeaders()));
+
+ if (entity == null) {
+ // This is a workaround for the signer implementation incorrectly
producing
+ // an invalid content checksum for empty body requests.
+ sdkRequestBuilder.putHeader(SignerConstant.X_AMZ_CONTENT_SHA256,
EMPTY_BODY_SHA256);
+ } else if (entity instanceof StringEntity) {
+ sdkRequestBuilder.contentStreamProvider(
+ () -> {
+ try {
+ return ((StringEntity) entity).getContent();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } else {
+ throw new UnsupportedOperationException("Unsupported entity type: " +
entity.getClass());
+ }
+
+ SdkHttpFullRequest signedSdkRequest =
signer.sign(sdkRequestBuilder.build(), params);
+ updateRequestHeaders(request, signedSdkRequest.headers());
+ }
+
+ private Map<String, List<String>> convertHeaders(Header[] headers) {
+ return Arrays.stream(headers)
+ .collect(
+ Collectors.groupingBy(
+ // Relocate Authorization header as SigV4 takes precedence
+ header ->
+ HttpHeaders.AUTHORIZATION.equals(header.getName())
+ ? RELOCATED_HEADER_PREFIX + header.getName()
+ : header.getName(),
+ Collectors.mapping(Header::getValue, Collectors.toList())));
+ }
+
+ private void updateRequestHeaders(HttpRequest request, Map<String,
List<String>> headers) {
+ headers.forEach(
+ (name, values) -> {
+ if (request.containsHeader(name)) {
+ Header[] original = request.getHeaders(name);
+ request.removeHeaders(name);
+ Arrays.asList(original)
+ .forEach(
+ header -> {
+ // Relocate headers if there is a conflict with signed
headers
+ if (!values.contains(header.getValue())) {
+ request.addHeader(RELOCATED_HEADER_PREFIX + name,
header.getValue());
+ }
+ });
+ }
+
+ values.forEach(value -> request.setHeader(name, value));
+ });
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4Signer.java
b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4Signer.java
new file mode 100644
index 0000000000..188d488db2
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4Signer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.Header;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+import software.amazon.awssdk.auth.signer.internal.SignerConstant;
+
+public class TestRESTSigV4Signer {
+ private static ClientAndServer mockServer;
+ private static HTTPClient client;
+
+ @BeforeClass
+ public static void beforeClass() {
+ mockServer = ClientAndServer.startClientAndServer();
+
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "rest.sigv4-enabled",
+ "true",
+ // CI environment doesn't have credentials, but a value must be
set for signing
+ AwsProperties.REST_SIGNER_REGION,
+ "us-west-2",
+ AwsProperties.REST_ACCESS_KEY_ID,
+ "id",
+ AwsProperties.REST_SECRET_ACCESS_KEY,
+ "secret");
+ client =
+ HTTPClient.builder(properties)
+ .uri("http://localhost:" + mockServer.getLocalPort())
+ .withHeader(HttpHeaders.AUTHORIZATION, "Bearer existing_token")
+ .build();
+ }
+
+ @AfterClass
+ public static void afterClass() throws IOException {
+ mockServer.stop();
+ client.close();
+ }
+
+ @Before
+ public void before() {
+ mockServer.reset();
+ }
+
+ @Test
+ public void signRequestWithoutBody() {
+ HttpRequest request =
+ HttpRequest.request()
+ .withMethod("GET")
+ .withPath("/v1/config")
+ // Require SigV4 Authorization
+ .withHeader(Header.header(HttpHeaders.AUTHORIZATION,
"AWS4-HMAC-SHA256.*"))
+ // Require that conflicting auth header is relocated
+ .withHeader(
+ Header.header(RESTSigV4Signer.RELOCATED_HEADER_PREFIX +
HttpHeaders.AUTHORIZATION))
+ // Require the empty body checksum
+ .withHeader(
+ Header.header(
+ SignerConstant.X_AMZ_CONTENT_SHA256,
RESTSigV4Signer.EMPTY_BODY_SHA256));
+
+ mockServer
+ .when(request)
+
.respond(HttpResponse.response().withStatusCode(HttpStatus.SC_OK).withBody("{}"));
+
+ ConfigResponse response =
+ client.get("v1/config", ConfigResponse.class, ImmutableMap.of(), e ->
{});
+
+ mockServer.verify(request, VerificationTimes.exactly(1));
+ Assertions.assertThat(response).isNotNull();
+ }
+
+ @Test
+ public void signRequestWithBody() {
+ HttpRequest request =
+ HttpRequest.request()
+ .withMethod("POST")
+ .withPath("/v1/oauth/token")
+ // Require SigV4 Authorization
+ .withHeader(Header.header(HttpHeaders.AUTHORIZATION,
"AWS4-HMAC-SHA256.*"))
+ // Require that conflicting auth header is relocated
+ .withHeader(
+ Header.header(RESTSigV4Signer.RELOCATED_HEADER_PREFIX +
HttpHeaders.AUTHORIZATION))
+ // Require a body checksum is set
+ .withHeader(Header.header(SignerConstant.X_AMZ_CONTENT_SHA256));
+
+ mockServer
+ .when(request)
+ .respond(
+ HttpResponse.response()
+ .withStatusCode(HttpStatus.SC_OK)
+ .withBody(
+ OAuth2Util.tokenResponseToJson(
+ OAuthTokenResponse.builder()
+ .withToken("fake_token")
+ .withTokenType("bearer")
+ .withIssuedTokenType("n/a")
+ .build())));
+
+ Map<String, String> formData = Maps.newHashMap();
+ formData.put("client_id", "asdfasd");
+ formData.put("client_secret", "asdfasdf");
+ formData.put("scope", "catalog");
+
+ OAuthTokenResponse response =
+ client.postForm(
+ "v1/oauth/token", formData, OAuthTokenResponse.class,
ImmutableMap.of(), e -> {});
+
+ mockServer.verify(request, VerificationTimes.exactly(1));
+ Assertions.assertThat(response).isNotNull();
+ }
+}
diff --git a/build.gradle b/build.gradle
index 0506825b3b..842ffa5be5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,6 +457,8 @@ project(':iceberg-aws') {
exclude group: 'com.google.code.gson', module: 'gson'
}
+ compileOnly 'org.apache.httpcomponents.client5:httpclient5'
+
testImplementation 'software.amazon.awssdk:iam'
testImplementation 'software.amazon.awssdk:s3control'
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
@@ -469,6 +471,8 @@ project(':iceberg-aws') {
testImplementation "org.xerial:sqlite-jdbc"
testImplementation "org.testcontainers:testcontainers"
testImplementation "org.apache.httpcomponents.client5:httpclient5"
+ testImplementation 'org.mock-server:mockserver-netty'
+ testImplementation 'org.mock-server:mockserver-client-java'
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
index f9e42486ca..a95c7dd1b5 100644
--- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -26,29 +26,36 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.hc.client5.http.classic.methods.HttpUriRequest;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.RESTException;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +63,9 @@ import org.slf4j.LoggerFactory;
public class HTTPClient implements RESTClient {
private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class);
+ private static final String SIGV4_ENABLED = "rest.sigv4-enabled";
+ private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL =
+ "org.apache.iceberg.aws.RESTSigV4Signer";
@VisibleForTesting static final String CLIENT_VERSION_HEADER =
"X-Client-Version";
@VisibleForTesting
@@ -64,13 +74,29 @@ public class HTTPClient implements RESTClient {
private final String uri;
private final CloseableHttpClient httpClient;
private final ObjectMapper mapper;
- private final Map<String, String> baseHeaders;
- private HTTPClient(String uri, Map<String, String> baseHeaders, ObjectMapper
objectMapper) {
+ private HTTPClient(
+ String uri,
+ Map<String, String> baseHeaders,
+ ObjectMapper objectMapper,
+ HttpRequestInterceptor requestInterceptor) {
this.uri = uri;
- this.httpClient = HttpClients.createDefault();
- this.baseHeaders = baseHeaders != null ? baseHeaders : ImmutableMap.of();
this.mapper = objectMapper;
+
+ HttpClientBuilder clientBuilder = HttpClients.custom();
+
+ if (baseHeaders != null) {
+ clientBuilder.setDefaultHeaders(
+ baseHeaders.entrySet().stream()
+ .map(e -> new BasicHeader(e.getKey(), e.getValue()))
+ .collect(Collectors.toList()));
+ }
+
+ if (requestInterceptor != null) {
+ clientBuilder.addRequestInterceptorLast(requestInterceptor);
+ }
+
+ this.httpClient = clientBuilder.build();
}
private static String extractResponseBodyAsString(CloseableHttpResponse
response) {
@@ -352,7 +378,6 @@ public class HTTPClient implements RESTClient {
// Many systems require that content type is set regardless and will fail,
even on an empty
// bodied request.
request.setHeader(HttpHeaders.CONTENT_TYPE, bodyMimeType);
- baseHeaders.forEach(request::setHeader);
requestHeaders.forEach(request::setHeader);
}
@@ -361,16 +386,63 @@ public class HTTPClient implements RESTClient {
httpClient.close(CloseMode.GRACEFUL);
}
+ @VisibleForTesting
+ static HttpRequestInterceptor loadInterceptorDynamically(
+ String impl, Map<String, String> properties) {
+ HttpRequestInterceptor instance;
+
+ DynConstructors.Ctor<HttpRequestInterceptor> ctor;
+ try {
+ ctor =
+ DynConstructors.builder(HttpRequestInterceptor.class)
+ .loader(HTTPClient.class.getClassLoader())
+ .impl(impl)
+ .buildChecked();
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot initialize RequestInterceptor, missing no-arg
constructor: %s", impl),
+ e);
+ }
+
+ try {
+ instance = ctor.newInstance();
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ String.format("Cannot initialize, %s does not implement
RequestInterceptor", impl), e);
+ }
+
+ DynMethods.builder("initialize")
+ .hiddenImpl(impl, Map.class)
+ .orNoop()
+ .build(instance)
+ .invoke(properties);
+
+ return instance;
+ }
+
+ /**
+ * @return http client builder
+ * @deprecated will be removed in 1.3.0; use {@link HTTPClient#builder(Map)}
+ */
+ @Deprecated
public static Builder builder() {
- return new Builder();
+ return new Builder(ImmutableMap.of());
+ }
+
+ public static Builder builder(Map<String, String> properties) {
+ return new Builder(properties);
}
public static class Builder {
+ private final Map<String, String> properties;
private final Map<String, String> baseHeaders = Maps.newHashMap();
private String uri;
private ObjectMapper mapper = RESTObjectMapper.mapper();
- private Builder() {}
+ private Builder(Map<String, String> properties) {
+ this.properties = properties;
+ }
public Builder uri(String baseUri) {
Preconditions.checkNotNull(baseUri, "Invalid uri for http client: null");
@@ -396,7 +468,14 @@ public class HTTPClient implements RESTClient {
public HTTPClient build() {
withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion());
withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER,
IcebergBuild.gitCommitShortId());
- return new HTTPClient(uri, baseHeaders, mapper);
+
+ HttpRequestInterceptor interceptor = null;
+
+ if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) {
+ interceptor =
loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties);
+ }
+
+ return new HTTPClient(uri, baseHeaders, mapper, interceptor);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index 83a08b1a2c..bec9feccfd 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -49,7 +49,7 @@ public class RESTCatalog
public RESTCatalog() {
this(
SessionCatalog.SessionContext.createEmpty(),
- config ->
HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
+ config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
}
public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index e214759802..80f31aba88 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -123,7 +123,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
}
public RESTSessionCatalog() {
- this(config ->
HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
+ this(config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
}
RESTSessionCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
index 6445fd237e..1bdf880fe8 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
@@ -35,6 +35,10 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -118,6 +122,18 @@ public class TestHTTPClient {
testHttpMethodOnFailure(HttpMethod.HEAD);
}
+ @Test
+ public void testDynamicHttpRequestInterceptorLoading() {
+ Map<String, String> properties = ImmutableMap.of("key", "val");
+
+ HttpRequestInterceptor interceptor =
+ HTTPClient.loadInterceptorDynamically(
+ TestHttpRequestInterceptor.class.getName(), properties);
+
+ assertThat(interceptor).isInstanceOf(TestHttpRequestInterceptor.class);
+ assertThat(((TestHttpRequestInterceptor)
interceptor).properties).isEqualTo(properties);
+ }
+
public static void testHttpMethodOnSuccess(HttpMethod method) throws
JsonProcessingException {
Item body = new Item(0L, "hank");
int statusCode = 200;
@@ -265,4 +281,17 @@ public class TestHTTPClient {
return Objects.equals(id, item.id) && Objects.equals(data, item.data);
}
}
+
+ public static class TestHttpRequestInterceptor implements
HttpRequestInterceptor {
+ private Map<String, String> properties;
+
+ public void initialize(Map<String, String> props) {
+ this.properties = props;
+ }
+
+ @Override
+ public void process(
+ org.apache.hc.core5.http.HttpRequest request, EntityDetails entity,
HttpContext context)
+ throws HttpException, IOException {}
+ }
}