This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c3443f65d4 AWS: Add socket connection timeout for Apache Http Builder
(#5787)
c3443f65d4 is described below
commit c3443f65d4e708c77602efc1bf91348eb80f48b9
Author: Rushan Jiang <[email protected]>
AuthorDate: Sat Sep 24 19:12:01 2022 -0400
AWS: Add socket connection timeout for Apache Http Builder (#5787)
---
.../java/org/apache/iceberg/aws/AwsProperties.java | 45 ++++++-
.../org/apache/iceberg/aws/TestAwsProperties.java | 140 +++++++++++++++++++++
.../java/org/apache/iceberg/util/PropertyUtil.java | 8 ++
3 files changed, 192 insertions(+), 1 deletion(-)
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 8ac11a04e0..639f61d934 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.aws;
import java.io.Serializable;
import java.net.URI;
+import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -27,6 +28,7 @@ import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.exceptions.ValidationException;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -360,6 +362,28 @@ public class AwsProperties implements Serializable {
public static final String HTTP_CLIENT_TYPE_DEFAULT =
HTTP_CLIENT_TYPE_URLCONNECTION;
+ /**
+ * Used to configure the connection timeout in milliseconds for {@link
+ * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag
only works when {@link
+ * #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE}
+ *
+ * <p>For more details, see
+ *
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+ */
+ public static final String HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS =
+ "http-client.apache.connection-timeout-ms";
+
+ /**
+ * Used to configure the socket timeout in milliseconds for {@link
+ * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag
only works when {@link
+ * #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE}
+ *
+ * <p>For more details, see
+ *
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+ */
+ public static final String HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS =
+ "http-client.apache.socket-timeout-ms";
+
/**
* Used by {@link S3FileIO} to tag objects when writing. To set, we can pass
a catalog property.
*
@@ -450,6 +474,8 @@ public class AwsProperties implements Serializable {
public static final String LAKE_FORMATION_DB_NAME = "lakeformation.db-name";
private String httpClientType;
+ private Long httpClientApacheConnectionTimeoutMs;
+ private Long httpClientApacheSocketTimeoutMs;
private final Set<software.amazon.awssdk.services.sts.model.Tag>
stsClientAssumeRoleTags;
private String clientAssumeRoleArn;
@@ -494,6 +520,8 @@ public class AwsProperties implements Serializable {
public AwsProperties() {
this.httpClientType = HTTP_CLIENT_TYPE_DEFAULT;
+ this.httpClientApacheConnectionTimeoutMs = null;
+ this.httpClientApacheSocketTimeoutMs = null;
this.stsClientAssumeRoleTags = Sets.newHashSet();
this.clientAssumeRoleArn = null;
@@ -545,6 +573,10 @@ public class AwsProperties implements Serializable {
public AwsProperties(Map<String, String> properties) {
this.httpClientType =
PropertyUtil.propertyAsString(properties, HTTP_CLIENT_TYPE,
HTTP_CLIENT_TYPE_DEFAULT);
+ this.httpClientApacheConnectionTimeoutMs =
+ PropertyUtil.propertyAsNullableLong(properties,
HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS);
+ this.httpClientApacheSocketTimeoutMs =
+ PropertyUtil.propertyAsNullableLong(properties,
HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS);
this.stsClientAssumeRoleTags = toStsTags(properties,
CLIENT_ASSUME_ROLE_TAGS_PREFIX);
this.clientAssumeRoleArn = properties.get(CLIENT_ASSUME_ROLE_ARN);
@@ -902,7 +934,8 @@ public class AwsProperties implements Serializable {
builder.httpClientBuilder(UrlConnectionHttpClient.builder());
break;
case HTTP_CLIENT_TYPE_APACHE:
- builder.httpClientBuilder(ApacheHttpClient.builder());
+ builder.httpClientBuilder(
+
ApacheHttpClient.builder().applyMutation(this::configureApacheHttpClientBuilder));
break;
default:
throw new IllegalArgumentException("Unrecognized HTTP client type " +
httpClientType);
@@ -990,4 +1023,14 @@ public class AwsProperties implements Serializable {
builder.endpointOverride(URI.create(endpoint));
}
}
+
+ @VisibleForTesting
+ <T extends ApacheHttpClient.Builder> void configureApacheHttpClientBuilder(T
builder) {
+ if (httpClientApacheConnectionTimeoutMs != null) {
+
builder.connectionTimeout(Duration.ofMillis(httpClientApacheConnectionTimeoutMs));
+ }
+ if (httpClientApacheSocketTimeoutMs != null) {
+
builder.socketTimeout(Duration.ofMillis(httpClientApacheSocketTimeoutMs));
+ }
+ }
}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
index 1cb705c37f..bd3b54fd99 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws;
+import java.time.Duration;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -29,6 +30,10 @@ import
software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -204,4 +209,139 @@ public class TestAwsProperties {
"secret",
capturedAwsCredentialsProvider.resolveCredentials().secretAccessKey());
}
+
+ @Test
+ public void testUrlHttpClientConfiguration() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.HTTP_CLIENT_TYPE, "urlconnection");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+ ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
+ ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+
+ awsProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
+
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
+ SdkHttpClient.Builder capturedHttpClientBuilder =
httpClientBuilderCaptor.getValue();
+
+ Assert.assertTrue(
+ "Should use url connection http client",
+ capturedHttpClientBuilder instanceof UrlConnectionHttpClient.Builder);
+ }
+
+ @Test
+ public void testApacheHttpClientConfiguration() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.HTTP_CLIENT_TYPE, "apache");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+ ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
+ ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+
+ awsProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
+
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
+ SdkHttpClient.Builder capturedHttpClientBuilder =
httpClientBuilderCaptor.getValue();
+ Assert.assertTrue(
+ "Should use apache http client",
+ capturedHttpClientBuilder instanceof ApacheHttpClient.Builder);
+ }
+
+ @Test
+ public void testInvalidHttpClientType() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.HTTP_CLIENT_TYPE, "test");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ S3ClientBuilder s3ClientBuilder = S3Client.builder();
+
+ AssertHelpers.assertThrows(
+ "should not support http client types other than urlconnection and
apache",
+ IllegalArgumentException.class,
+ "Unrecognized HTTP client type",
+ () -> awsProperties.applyHttpClientConfigurations(s3ClientBuilder));
+ }
+
+ @Test
+ public void testApacheConnectionSocketTimeoutConfiguration() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS, "100");
+ properties.put(AwsProperties.HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS,
"200");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ ApacheHttpClient.Builder apacheHttpClientBuilder =
ApacheHttpClient.builder();
+ ApacheHttpClient.Builder spyApacheHttpClientBuilder =
Mockito.spy(apacheHttpClientBuilder);
+ ArgumentCaptor<Duration> socketTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+ ArgumentCaptor<Duration> connectionTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+
+ awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+
Mockito.verify(spyApacheHttpClientBuilder).socketTimeout(socketTimeoutCaptor.capture());
+
Mockito.verify(spyApacheHttpClientBuilder).connectionTimeout(connectionTimeoutCaptor.capture());
+
+ Duration capturedSocketTimeout = socketTimeoutCaptor.getValue();
+ Duration capturedConnectionTimeout = connectionTimeoutCaptor.getValue();
+
+ Assert.assertEquals(
+ "The configured socket timeout should be 100 ms", 100,
capturedSocketTimeout.toMillis());
+ Assert.assertEquals(
+ "The configured connection timeout should be 200 ms",
+ 200,
+ capturedConnectionTimeout.toMillis());
+ }
+
+ @Test
+ public void testApacheConnectionTimeoutConfiguration() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS,
"200");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ ApacheHttpClient.Builder apacheHttpClientBuilder =
ApacheHttpClient.builder();
+ ApacheHttpClient.Builder spyApacheHttpClientBuilder =
Mockito.spy(apacheHttpClientBuilder);
+ ArgumentCaptor<Duration> connectionTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+ ArgumentCaptor<Duration> socketTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+
+ awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+
Mockito.verify(spyApacheHttpClientBuilder).connectionTimeout(connectionTimeoutCaptor.capture());
+ Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+ .socketTimeout(socketTimeoutCaptor.capture());
+
+ Duration capturedConnectionTimeout = connectionTimeoutCaptor.getValue();
+
+ Assert.assertEquals(
+ "The configured connection timeout should be 200 ms",
+ 200,
+ capturedConnectionTimeout.toMillis());
+ }
+
+ @Test
+ public void testApacheSocketTimeoutConfiguration() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS, "100");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ ApacheHttpClient.Builder apacheHttpClientBuilder =
ApacheHttpClient.builder();
+ ApacheHttpClient.Builder spyApacheHttpClientBuilder =
Mockito.spy(apacheHttpClientBuilder);
+ ArgumentCaptor<Duration> connectionTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+ ArgumentCaptor<Duration> socketTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+
+ awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+ Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+ .connectionTimeout(connectionTimeoutCaptor.capture());
+
Mockito.verify(spyApacheHttpClientBuilder).socketTimeout(socketTimeoutCaptor.capture());
+
+ Duration capturedSocketTimeout = socketTimeoutCaptor.getValue();
+
+ Assert.assertEquals(
+ "The configured socket timeout should be 100 ms", 100,
capturedSocketTimeout.toMillis());
+ }
+
+ @Test
+ public void testApacheDefaultConfiguration() {
+ Map<String, String> properties = Maps.newHashMap();
+ AwsProperties awsProperties = new AwsProperties(properties);
+ ApacheHttpClient.Builder apacheHttpClientBuilder =
ApacheHttpClient.builder();
+ ApacheHttpClient.Builder spyApacheHttpClientBuilder =
Mockito.spy(apacheHttpClientBuilder);
+ ArgumentCaptor<Duration> connectionTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+ ArgumentCaptor<Duration> socketTimeoutCaptor =
ArgumentCaptor.forClass(Duration.class);
+
+ awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+ Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+ .connectionTimeout(connectionTimeoutCaptor.capture());
+ Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+ .socketTimeout(socketTimeoutCaptor.capture());
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
index b2acf16d57..8797788b2b 100644
--- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
@@ -66,6 +66,14 @@ public class PropertyUtil {
return defaultValue;
}
+ public static Long propertyAsNullableLong(Map<String, String> properties,
String property) {
+ String value = properties.get(property);
+ if (value != null) {
+ return Long.parseLong(value);
+ }
+ return null;
+ }
+
public static String propertyAsString(
Map<String, String> properties, String property, String defaultValue) {
String value = properties.get(property);