This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c3443f65d4 AWS: Add socket connection timeout for Apache Http Builder 
(#5787)
c3443f65d4 is described below

commit c3443f65d4e708c77602efc1bf91348eb80f48b9
Author: Rushan Jiang <[email protected]>
AuthorDate: Sat Sep 24 19:12:01 2022 -0400

    AWS: Add socket connection timeout for Apache Http Builder (#5787)
---
 .../java/org/apache/iceberg/aws/AwsProperties.java |  45 ++++++-
 .../org/apache/iceberg/aws/TestAwsProperties.java  | 140 +++++++++++++++++++++
 .../java/org/apache/iceberg/util/PropertyUtil.java |   8 ++
 3 files changed, 192 insertions(+), 1 deletion(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java 
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 8ac11a04e0..639f61d934 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.aws;
 
 import java.io.Serializable;
 import java.net.URI;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -27,6 +28,7 @@ import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
 import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
 import org.apache.iceberg.aws.s3.S3FileIO;
 import org.apache.iceberg.exceptions.ValidationException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -360,6 +362,28 @@ public class AwsProperties implements Serializable {
 
   public static final String HTTP_CLIENT_TYPE_DEFAULT = 
HTTP_CLIENT_TYPE_URLCONNECTION;
 
+  /**
+   * Used to configure the connection timeout in milliseconds for {@link
+   * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when {@link
+   * #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE}
+   *
+   * <p>For more details, see
+   * 
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+   */
+  public static final String HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS =
+      "http-client.apache.connection-timeout-ms";
+
+  /**
+   * Used to configure the socket timeout in milliseconds for {@link
+   * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when {@link
+   * #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE}
+   *
+   * <p>For more details, see
+   * 
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+   */
+  public static final String HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS =
+      "http-client.apache.socket-timeout-ms";
+
   /**
    * Used by {@link S3FileIO} to tag objects when writing. To set, we can pass 
a catalog property.
    *
@@ -450,6 +474,8 @@ public class AwsProperties implements Serializable {
   public static final String LAKE_FORMATION_DB_NAME = "lakeformation.db-name";
 
   private String httpClientType;
+  private Long httpClientApacheConnectionTimeoutMs;
+  private Long httpClientApacheSocketTimeoutMs;
   private final Set<software.amazon.awssdk.services.sts.model.Tag> 
stsClientAssumeRoleTags;
 
   private String clientAssumeRoleArn;
@@ -494,6 +520,8 @@ public class AwsProperties implements Serializable {
 
   public AwsProperties() {
     this.httpClientType = HTTP_CLIENT_TYPE_DEFAULT;
+    this.httpClientApacheConnectionTimeoutMs = null;
+    this.httpClientApacheSocketTimeoutMs = null;
     this.stsClientAssumeRoleTags = Sets.newHashSet();
 
     this.clientAssumeRoleArn = null;
@@ -545,6 +573,10 @@ public class AwsProperties implements Serializable {
   public AwsProperties(Map<String, String> properties) {
     this.httpClientType =
         PropertyUtil.propertyAsString(properties, HTTP_CLIENT_TYPE, 
HTTP_CLIENT_TYPE_DEFAULT);
+    this.httpClientApacheConnectionTimeoutMs =
+        PropertyUtil.propertyAsNullableLong(properties, 
HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS);
+    this.httpClientApacheSocketTimeoutMs =
+        PropertyUtil.propertyAsNullableLong(properties, 
HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS);
     this.stsClientAssumeRoleTags = toStsTags(properties, 
CLIENT_ASSUME_ROLE_TAGS_PREFIX);
 
     this.clientAssumeRoleArn = properties.get(CLIENT_ASSUME_ROLE_ARN);
@@ -902,7 +934,8 @@ public class AwsProperties implements Serializable {
         builder.httpClientBuilder(UrlConnectionHttpClient.builder());
         break;
       case HTTP_CLIENT_TYPE_APACHE:
-        builder.httpClientBuilder(ApacheHttpClient.builder());
+        builder.httpClientBuilder(
+            
ApacheHttpClient.builder().applyMutation(this::configureApacheHttpClientBuilder));
         break;
       default:
         throw new IllegalArgumentException("Unrecognized HTTP client type " + 
httpClientType);
@@ -990,4 +1023,14 @@ public class AwsProperties implements Serializable {
       builder.endpointOverride(URI.create(endpoint));
     }
   }
+
+  @VisibleForTesting
+  <T extends ApacheHttpClient.Builder> void configureApacheHttpClientBuilder(T 
builder) {
+    if (httpClientApacheConnectionTimeoutMs != null) {
+      
builder.connectionTimeout(Duration.ofMillis(httpClientApacheConnectionTimeoutMs));
+    }
+    if (httpClientApacheSocketTimeoutMs != null) {
+      
builder.socketTimeout(Duration.ofMillis(httpClientApacheSocketTimeoutMs));
+    }
+  }
 }
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java 
b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
index 1cb705c37f..bd3b54fd99 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.aws;
 
+import java.time.Duration;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -29,6 +30,10 @@ import 
software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
 
@@ -204,4 +209,139 @@ public class TestAwsProperties {
         "secret",
         capturedAwsCredentialsProvider.resolveCredentials().secretAccessKey());
   }
+
+  @Test
+  public void testUrlHttpClientConfiguration() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.HTTP_CLIENT_TYPE, "urlconnection");
+    AwsProperties awsProperties = new AwsProperties(properties);
+    S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+    ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
+        ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+
+    awsProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
+    
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
+    SdkHttpClient.Builder capturedHttpClientBuilder = 
httpClientBuilderCaptor.getValue();
+
+    Assert.assertTrue(
+        "Should use url connection http client",
+        capturedHttpClientBuilder instanceof UrlConnectionHttpClient.Builder);
+  }
+
+  @Test
+  public void testApacheHttpClientConfiguration() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.HTTP_CLIENT_TYPE, "apache");
+    AwsProperties awsProperties = new AwsProperties(properties);
+    S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+    ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
+        ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+
+    awsProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
+    
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
+    SdkHttpClient.Builder capturedHttpClientBuilder = 
httpClientBuilderCaptor.getValue();
+    Assert.assertTrue(
+        "Should use apache http client",
+        capturedHttpClientBuilder instanceof ApacheHttpClient.Builder);
+  }
+
+  @Test
+  public void testInvalidHttpClientType() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.HTTP_CLIENT_TYPE, "test");
+    AwsProperties awsProperties = new AwsProperties(properties);
+    S3ClientBuilder s3ClientBuilder = S3Client.builder();
+
+    AssertHelpers.assertThrows(
+        "should not support http client types other than urlconnection and 
apache",
+        IllegalArgumentException.class,
+        "Unrecognized HTTP client type",
+        () -> awsProperties.applyHttpClientConfigurations(s3ClientBuilder));
+  }
+
+  @Test
+  public void testApacheConnectionSocketTimeoutConfiguration() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS, "100");
+    properties.put(AwsProperties.HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS, 
"200");
+    AwsProperties awsProperties = new AwsProperties(properties);
+    ApacheHttpClient.Builder apacheHttpClientBuilder = 
ApacheHttpClient.builder();
+    ApacheHttpClient.Builder spyApacheHttpClientBuilder = 
Mockito.spy(apacheHttpClientBuilder);
+    ArgumentCaptor<Duration> socketTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+    ArgumentCaptor<Duration> connectionTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+
+    awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+    
Mockito.verify(spyApacheHttpClientBuilder).socketTimeout(socketTimeoutCaptor.capture());
+    
Mockito.verify(spyApacheHttpClientBuilder).connectionTimeout(connectionTimeoutCaptor.capture());
+
+    Duration capturedSocketTimeout = socketTimeoutCaptor.getValue();
+    Duration capturedConnectionTimeout = connectionTimeoutCaptor.getValue();
+
+    Assert.assertEquals(
+        "The configured socket timeout should be 100 ms", 100, 
capturedSocketTimeout.toMillis());
+    Assert.assertEquals(
+        "The configured connection timeout should be 200 ms",
+        200,
+        capturedConnectionTimeout.toMillis());
+  }
+
+  @Test
+  public void testApacheConnectionTimeoutConfiguration() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS, 
"200");
+    AwsProperties awsProperties = new AwsProperties(properties);
+    ApacheHttpClient.Builder apacheHttpClientBuilder = 
ApacheHttpClient.builder();
+    ApacheHttpClient.Builder spyApacheHttpClientBuilder = 
Mockito.spy(apacheHttpClientBuilder);
+    ArgumentCaptor<Duration> connectionTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+    ArgumentCaptor<Duration> socketTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+
+    awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+    
Mockito.verify(spyApacheHttpClientBuilder).connectionTimeout(connectionTimeoutCaptor.capture());
+    Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+        .socketTimeout(socketTimeoutCaptor.capture());
+
+    Duration capturedConnectionTimeout = connectionTimeoutCaptor.getValue();
+
+    Assert.assertEquals(
+        "The configured connection timeout should be 200 ms",
+        200,
+        capturedConnectionTimeout.toMillis());
+  }
+
+  @Test
+  public void testApacheSocketTimeoutConfiguration() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS, "100");
+    AwsProperties awsProperties = new AwsProperties(properties);
+    ApacheHttpClient.Builder apacheHttpClientBuilder = 
ApacheHttpClient.builder();
+    ApacheHttpClient.Builder spyApacheHttpClientBuilder = 
Mockito.spy(apacheHttpClientBuilder);
+    ArgumentCaptor<Duration> connectionTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+    ArgumentCaptor<Duration> socketTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+
+    awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+    Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+        .connectionTimeout(connectionTimeoutCaptor.capture());
+    
Mockito.verify(spyApacheHttpClientBuilder).socketTimeout(socketTimeoutCaptor.capture());
+
+    Duration capturedSocketTimeout = socketTimeoutCaptor.getValue();
+
+    Assert.assertEquals(
+        "The configured socket timeout should be 100 ms", 100, 
capturedSocketTimeout.toMillis());
+  }
+
+  @Test
+  public void testApacheDefaultConfiguration() {
+    Map<String, String> properties = Maps.newHashMap();
+    AwsProperties awsProperties = new AwsProperties(properties);
+    ApacheHttpClient.Builder apacheHttpClientBuilder = 
ApacheHttpClient.builder();
+    ApacheHttpClient.Builder spyApacheHttpClientBuilder = 
Mockito.spy(apacheHttpClientBuilder);
+    ArgumentCaptor<Duration> connectionTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+    ArgumentCaptor<Duration> socketTimeoutCaptor = 
ArgumentCaptor.forClass(Duration.class);
+
+    awsProperties.configureApacheHttpClientBuilder(spyApacheHttpClientBuilder);
+    Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+        .connectionTimeout(connectionTimeoutCaptor.capture());
+    Mockito.verify(spyApacheHttpClientBuilder, Mockito.never())
+        .socketTimeout(socketTimeoutCaptor.capture());
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java 
b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
index b2acf16d57..8797788b2b 100644
--- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
@@ -66,6 +66,14 @@ public class PropertyUtil {
     return defaultValue;
   }
 
+  public static Long propertyAsNullableLong(Map<String, String> properties, 
String property) {
+    String value = properties.get(property);
+    if (value != null) {
+      return Long.parseLong(value);
+    }
+    return null;
+  }
+
   public static String propertyAsString(
       Map<String, String> properties, String property, String defaultValue) {
     String value = properties.get(property);

Reply via email to