This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 528b9b336c Core: Allow configuring socket/connection timeout in 
HTTPClient (#10053)
528b9b336c is described below

commit 528b9b336c1d6e8051b46ee48f0388769f5eda55
Author: Harish Chandrasekaran <[email protected]>
AuthorDate: Thu Apr 11 03:31:41 2024 -0700

    Core: Allow configuring socket/connection timeout in HTTPClient (#10053)
---
 .../java/org/apache/iceberg/rest/HTTPClient.java   | 69 +++++++++++++++++----
 .../org/apache/iceberg/rest/TestHTTPClient.java    | 70 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java 
b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
index 9366630954..13f96a6b21 100644
--- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -26,10 +26,12 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.hc.client5.http.classic.methods.HttpUriRequest;
 import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.config.ConnectionConfig;
 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
 import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
 import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
@@ -79,6 +81,11 @@ public class HTTPClient implements RESTClient {
   private static final String REST_MAX_CONNECTIONS_PER_ROUTE = 
"rest.client.connections-per-route";
   private static final int REST_MAX_CONNECTIONS_PER_ROUTE_DEFAULT = 100;
 
+  @VisibleForTesting
+  static final String REST_CONNECTION_TIMEOUT_MS = 
"rest.client.connection-timeout-ms";
+
+  @VisibleForTesting static final String REST_SOCKET_TIMEOUT_MS = 
"rest.client.socket-timeout-ms";
+
   private final String uri;
   private final CloseableHttpClient httpClient;
   private final ObjectMapper mapper;
@@ -88,22 +95,13 @@ public class HTTPClient implements RESTClient {
       Map<String, String> baseHeaders,
       ObjectMapper objectMapper,
       HttpRequestInterceptor requestInterceptor,
-      Map<String, String> properties) {
+      Map<String, String> properties,
+      HttpClientConnectionManager connectionManager) {
     this.uri = uri;
     this.mapper = objectMapper;
 
     HttpClientBuilder clientBuilder = HttpClients.custom();
 
-    HttpClientConnectionManager connectionManager =
-        PoolingHttpClientConnectionManagerBuilder.create()
-            .useSystemProperties()
-            .setMaxConnTotal(Integer.getInteger(REST_MAX_CONNECTIONS, 
REST_MAX_CONNECTIONS_DEFAULT))
-            .setMaxConnPerRoute(
-                PropertyUtil.propertyAsInt(
-                    properties,
-                    REST_MAX_CONNECTIONS_PER_ROUTE,
-                    REST_MAX_CONNECTIONS_PER_ROUTE_DEFAULT))
-            .build();
     clientBuilder.setConnectionManager(connectionManager);
 
     if (baseHeaders != null) {
@@ -448,6 +446,47 @@ public class HTTPClient implements RESTClient {
     return instance;
   }
 
+  static HttpClientConnectionManager configureConnectionManager(Map<String, 
String> properties) {
+    PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder =
+        PoolingHttpClientConnectionManagerBuilder.create();
+    ConnectionConfig connectionConfig = configureConnectionConfig(properties);
+    if (connectionConfig != null) {
+      connectionManagerBuilder.setDefaultConnectionConfig(connectionConfig);
+    }
+
+    return connectionManagerBuilder
+        .useSystemProperties()
+        .setMaxConnTotal(Integer.getInteger(REST_MAX_CONNECTIONS, 
REST_MAX_CONNECTIONS_DEFAULT))
+        .setMaxConnPerRoute(
+            PropertyUtil.propertyAsInt(
+                properties, REST_MAX_CONNECTIONS_PER_ROUTE, 
REST_MAX_CONNECTIONS_PER_ROUTE_DEFAULT))
+        .build();
+  }
+
+  @VisibleForTesting
+  static ConnectionConfig configureConnectionConfig(Map<String, String> 
properties) {
+    Long connectionTimeoutMillis =
+        PropertyUtil.propertyAsNullableLong(properties, 
REST_CONNECTION_TIMEOUT_MS);
+    Integer socketTimeoutMillis =
+        PropertyUtil.propertyAsNullableInt(properties, REST_SOCKET_TIMEOUT_MS);
+
+    if (connectionTimeoutMillis == null && socketTimeoutMillis == null) {
+      return null;
+    }
+
+    ConnectionConfig.Builder connConfigBuilder = ConnectionConfig.custom();
+
+    if (connectionTimeoutMillis != null) {
+      connConfigBuilder.setConnectTimeout(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    if (socketTimeoutMillis != null) {
+      connConfigBuilder.setSocketTimeout(socketTimeoutMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    return connConfigBuilder.build();
+  }
+
   public static Builder builder(Map<String, String> properties) {
     return new Builder(properties);
   }
@@ -493,7 +532,13 @@ public class HTTPClient implements RESTClient {
         interceptor = 
loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties);
       }
 
-      return new HTTPClient(uri, baseHeaders, mapper, interceptor, properties);
+      return new HTTPClient(
+          uri,
+          baseHeaders,
+          mapper,
+          interceptor,
+          properties,
+          configureConnectionManager(properties));
     }
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java 
b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
index e596df43e6..93585cdbb5 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
@@ -31,10 +31,13 @@ import static org.mockserver.model.HttpResponse.response;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import org.apache.hc.client5.http.config.ConnectionConfig;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequestInterceptor;
@@ -47,6 +50,8 @@ import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockserver.integration.ClientAndServer;
 import org.mockserver.model.HttpRequest;
 import org.mockserver.model.HttpResponse;
@@ -133,6 +138,71 @@ public class TestHTTPClient {
     assertThat(((TestHttpRequestInterceptor) 
interceptor).properties).isEqualTo(properties);
   }
 
+  @Test
+  public void testSocketAndConnectionTimeoutSet() {
+    long connectionTimeoutMs = 10L;
+    int socketTimeoutMs = 10;
+    Map<String, String> properties =
+        ImmutableMap.of(
+            HTTPClient.REST_CONNECTION_TIMEOUT_MS, 
String.valueOf(connectionTimeoutMs),
+            HTTPClient.REST_SOCKET_TIMEOUT_MS, 
String.valueOf(socketTimeoutMs));
+
+    ConnectionConfig connectionConfig = 
HTTPClient.configureConnectionConfig(properties);
+    assertThat(connectionConfig).isNotNull();
+    
assertThat(connectionConfig.getConnectTimeout().getDuration()).isEqualTo(connectionTimeoutMs);
+    
assertThat(connectionConfig.getSocketTimeout().getDuration()).isEqualTo(socketTimeoutMs);
+  }
+
+  @Test
+  public void testSocketTimeout() throws IOException {
+    long socketTimeoutMs = 2000L;
+    Map<String, String> properties =
+        ImmutableMap.of(HTTPClient.REST_SOCKET_TIMEOUT_MS, 
String.valueOf(socketTimeoutMs));
+    String path = "socket/timeout/path";
+
+    try (HTTPClient client = HTTPClient.builder(properties).uri(URI).build()) {
+      HttpRequest mockRequest =
+          request()
+              .withPath("/" + path)
+              .withMethod(HttpMethod.HEAD.name().toUpperCase(Locale.ROOT));
+      // Setting a response delay of 5 seconds to simulate hitting the 
configured socket timeout of
+      // 2 seconds
+      HttpResponse mockResponse =
+          response()
+              .withStatusCode(200)
+              .withBody("Delayed response")
+              .withDelay(TimeUnit.MILLISECONDS, 5000);
+      mockServer.when(mockRequest).respond(mockResponse);
+
+      Assertions.assertThatThrownBy(() -> client.head(path, ImmutableMap.of(), 
(unused) -> {}))
+          .cause()
+          .isInstanceOf(SocketTimeoutException.class)
+          .hasMessage("Read timed out");
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {HTTPClient.REST_CONNECTION_TIMEOUT_MS, 
HTTPClient.REST_SOCKET_TIMEOUT_MS})
+  public void testInvalidTimeout(String timeoutMsType) {
+    String invalidTimeoutMs = "invalidMs";
+    Assertions.assertThatThrownBy(
+            () ->
+                HTTPClient.builder(ImmutableMap.of(timeoutMsType, 
invalidTimeoutMs))
+                    .uri(URI)
+                    .build())
+        .isInstanceOf(NumberFormatException.class)
+        .hasMessage(String.format("For input string: \"%s\"", 
invalidTimeoutMs));
+
+    String invalidNegativeTimeoutMs = "-1";
+    Assertions.assertThatThrownBy(
+            () ->
+                HTTPClient.builder(ImmutableMap.of(timeoutMsType, 
invalidNegativeTimeoutMs))
+                    .uri(URI)
+                    .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(String.format("duration must not be negative: %s", 
invalidNegativeTimeoutMs));
+  }
+
   public static void testHttpMethodOnSuccess(HttpMethod method) throws 
JsonProcessingException {
     Item body = new Item(0L, "hank");
     int statusCode = 200;

Reply via email to