This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new 4d9320df36 AWS: Configure builder for reuse of http connection pool in
SDKv2 (#14161) (#14774)
4d9320df36 is described below
commit 4d9320df367e371b81fee3a7f7db4f7980bcf832
Author: Anurag Mantripragada <[email protected]>
AuthorDate: Fri Dec 5 17:27:01 2025 -0800
AWS: Configure builder for reuse of http connection pool in SDKv2 (#14161)
(#14774)
AWS: Configure builder for reuse of http clients in SDKv2
---
.../aws/ApacheHttpClientConfigurations.java | 39 ++-
.../iceberg/aws/BaseHttpClientConfigurations.java | 75 ++++++
.../org/apache/iceberg/aws/HttpClientCache.java | 203 +++++++++++++++
.../aws/UrlConnectionHttpClientConfigurations.java | 32 ++-
.../apache/iceberg/aws/TestHttpClientCache.java | 276 +++++++++++++++++++++
.../iceberg/aws/TestHttpClientProperties.java | 67 +++--
6 files changed, 668 insertions(+), 24 deletions(-)
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
b/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
index 95fe34b742..3445928d15 100644
---
a/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
+++
b/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
@@ -21,13 +21,16 @@ package org.apache.iceberg.aws;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
-import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
-class ApacheHttpClientConfigurations {
+class ApacheHttpClientConfigurations extends BaseHttpClientConfigurations {
private Long connectionTimeoutMs;
private Long socketTimeoutMs;
private Long acquisitionTimeoutMs;
@@ -41,10 +44,11 @@ class ApacheHttpClientConfigurations {
private ApacheHttpClientConfigurations() {}
- public <T extends AwsSyncClientBuilder> void configureHttpClientBuilder(T
awsClientBuilder) {
- ApacheHttpClient.Builder apacheHttpClientBuilder =
ApacheHttpClient.builder();
+ @Override
+ protected SdkHttpClient buildHttpClient() {
+ final ApacheHttpClient.Builder apacheHttpClientBuilder =
ApacheHttpClient.builder();
configureApacheHttpClientBuilder(apacheHttpClientBuilder);
- awsClientBuilder.httpClientBuilder(apacheHttpClientBuilder);
+ return apacheHttpClientBuilder.build();
}
private void initialize(Map<String, String> httpClientProperties) {
@@ -115,6 +119,31 @@ class ApacheHttpClientConfigurations {
}
}
+ /**
+ * Generate a cache key based on HTTP client configuration. This ensures
clients with identical
+ * configurations share the same HTTP client instance.
+ */
+ @Override
+ protected String generateHttpClientCacheKey() {
+ Map<String, Object> keyComponents = Maps.newTreeMap();
+
+ keyComponents.put("type", "apache");
+ keyComponents.put("connectionTimeoutMs", connectionTimeoutMs);
+ keyComponents.put("socketTimeoutMs", socketTimeoutMs);
+ keyComponents.put("acquisitionTimeoutMs", acquisitionTimeoutMs);
+ keyComponents.put("connectionMaxIdleTimeMs", connectionMaxIdleTimeMs);
+ keyComponents.put("connectionTimeToLiveMs", connectionTimeToLiveMs);
+ keyComponents.put("expectContinueEnabled", expectContinueEnabled);
+ keyComponents.put("maxConnections", maxConnections);
+ keyComponents.put("tcpKeepAliveEnabled", tcpKeepAliveEnabled);
+ keyComponents.put("useIdleConnectionReaperEnabled",
useIdleConnectionReaperEnabled);
+ keyComponents.put("proxyEndpoint", proxyEndpoint);
+
+ return keyComponents.entrySet().stream()
+ .map(entry -> entry.getKey() + "=" +
Objects.toString(entry.getValue(), "null"))
+ .collect(Collectors.joining(",", "apache[", "]"));
+ }
+
public static ApacheHttpClientConfigurations create(Map<String, String>
properties) {
ApacheHttpClientConfigurations configurations = new
ApacheHttpClientConfigurations();
configurations.initialize(properties);
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java
b/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java
new file mode 100644
index 0000000000..d4301f0448
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+/**
+ * Base class for HTTP client configurations that provides managed HTTP client
lifecycle with
+ * reference counting.
+ *
+ * <p>This class encapsulates the interaction with {@link HttpClientCache} to
ensure HTTP clients
+ * are properly shared and their lifecycle managed via reference counting.
Subclasses are
+ * responsible for providing configuration-specific cache keys and building
the appropriate HTTP
+ * client type (Apache, UrlConnection, etc.).
+ */
+abstract class BaseHttpClientConfigurations {
+
+ private static final HttpClientCache CACHE = HttpClientCache.instance();
+
+ /**
+ * Generate a unique cache key based on the HTTP client configuration. The
cache key is used to
+ * determine whether HTTP clients can be shared across different factory
instances.
+ *
+ * <p>Implementations should include all configuration parameters that
affect HTTP client behavior
+ * (timeouts, connection settings, proxy configuration, etc.) to ensure
clients are only shared
+ * when they have identical configurations.
+ *
+ * @return a unique string representing this HTTP client configuration
+ */
+ protected abstract String generateHttpClientCacheKey();
+
+ /**
+ * Build the actual HTTP client instance based on the configuration. This
method is called only
+ * when a new HTTP client needs to be created (i.e., when no cached client
exists for the given
+ * cache key).
+ *
+ * @return a configured {@link SdkHttpClient} instance
+ */
+ protected abstract SdkHttpClient buildHttpClient();
+
+ /**
+ * Configure the AWS client builder with a managed HTTP client.
+ *
+ * <p>This method obtains a managed HTTP client from the cache using the
configuration-specific
+ * cache key. If a client with the same configuration already exists in the
cache, it will be
+ * reused with an incremented reference count. Otherwise, a new client will
be built and cached.
+ *
+ * @param awsClientBuilder the AWS client builder to configure
+ * @param <T> the type of AWS client builder
+ */
+ public <T extends AwsSyncClientBuilder> void configureHttpClientBuilder(T
awsClientBuilder) {
+ String cacheKey = generateHttpClientCacheKey();
+
+ SdkHttpClient managedHttpClient = CACHE.getOrCreateClient(cacheKey,
this::buildHttpClient);
+
+ awsClientBuilder.httpClient(managedHttpClient);
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java
b/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java
new file mode 100644
index 0000000000..79444a62ae
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.ExecutableHttpRequest;
+import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+/**
+ * A cache that manages the lifecycle of shared HTTP clients for AWS SDK v2
using reference
+ * counting. Package-private - only accessed via {@link
BaseHttpClientConfigurations}.
+ */
+final class HttpClientCache {
+ private static final Logger LOG =
LoggerFactory.getLogger(HttpClientCache.class);
+
+ private final ConcurrentMap<String, ManagedHttpClient> clients =
Maps.newConcurrentMap();
+ private static volatile HttpClientCache instance;
+
+ static HttpClientCache instance() {
+ if (instance == null) {
+ synchronized (HttpClientCache.class) {
+ if (instance == null) {
+ instance = new HttpClientCache();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * Get or create a managed HTTP client for the given configuration. Each
call increments the
+ * reference count for the client and returns a ref-counted wrapper.
+ *
+ * @param clientKey unique key identifying the client configuration
+ * @param clientFactory factory to create the HTTP client if not cached
+ * @return a ref-counted HTTP client wrapper
+ */
+ SdkHttpClient getOrCreateClient(String clientKey, Supplier<SdkHttpClient>
clientFactory) {
+ ManagedHttpClient managedClient =
+ clients.computeIfAbsent(
+ clientKey,
+ key -> {
+ LOG.debug("Creating new managed HTTP client for key: {}", key);
+ SdkHttpClient httpClient = clientFactory.get();
+ return new ManagedHttpClient(httpClient, key);
+ });
+ // Return the cached ref-counted wrapper
+ return managedClient.acquire();
+ }
+
+ /**
+ * Release a reference to the HTTP client. When the reference count reaches
zero, the client is
+ * closed and removed from the cache.
+ *
+ * @param clientKey the key identifying the client to release
+ */
+ void releaseClient(String clientKey) {
+ ManagedHttpClient managedClient = clients.get(clientKey);
+ if (null != managedClient && managedClient.release()) {
+ clients.remove(clientKey, managedClient);
+ }
+ }
+
+ @VisibleForTesting
+ Map<String, ManagedHttpClient> clients() {
+ return Collections.unmodifiableMap(clients);
+ }
+
+ @VisibleForTesting
+ void clear() {
+ clients.values().forEach(ManagedHttpClient::close);
+ clients.clear();
+ }
+
+ /**
+ * Managed HTTP client wrapper that provides reference counting for
lifecycle management. The HTTP
+ * client is closed when the reference count reaches zero.
+ */
+ static class ManagedHttpClient implements SdkHttpClient {
+ private final SdkHttpClient httpClient;
+ private final String clientKey;
+ private volatile int refCount = 0;
+ private boolean closed = false;
+
+ ManagedHttpClient(SdkHttpClient httpClient, String clientKey) {
+ this.httpClient = httpClient;
+ this.clientKey = clientKey;
+ LOG.debug("Created managed HTTP client: key={}", clientKey);
+ }
+
+ /**
+ * Acquire a reference to the HTTP client, incrementing the reference
count.
+ *
+ * @return the ref-counted wrapper client
+ * @throws IllegalStateException if the client has already been closed
+ */
+ synchronized ManagedHttpClient acquire() {
+ if (closed) {
+ throw new IllegalStateException("Cannot acquire closed HTTP client: "
+ clientKey);
+ }
+ refCount++;
+ LOG.debug("Acquired HTTP client: key={}, refCount={}", clientKey,
refCount);
+ return this;
+ }
+
+ /**
+ * Release a reference to the HTTP client, decrementing the reference
count. If the count
+ * reaches zero, the client is closed.
+ *
+ * @return true if the client was closed, false otherwise
+ */
+ synchronized boolean release() {
+ if (closed) {
+ LOG.warn("Attempted to release already closed HTTP client: key={}",
clientKey);
+ return false;
+ }
+
+ refCount--;
+ LOG.debug("Released HTTP client: key={}, refCount={}", clientKey,
refCount);
+ if (refCount == 0) {
+ return closeHttpClient();
+ } else if (refCount < 0) {
+ LOG.warn(
+ "HTTP client reference count went negative key={}, refCount={}",
clientKey, refCount);
+ refCount = 0;
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ SdkHttpClient httpClient() {
+ return httpClient;
+ }
+
+ /**
+ * Close the HTTP client if not already closed.
+ *
+ * @return true if the client was closed by this call, false if already
closed
+ */
+ private boolean closeHttpClient() {
+ if (!closed) {
+ closed = true;
+ LOG.debug("Closing HTTP client: key={}", clientKey);
+ try {
+ httpClient.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close HTTP client: key={}", clientKey, e);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ int refCount() {
+ return refCount;
+ }
+
+ @VisibleForTesting
+ boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
+ return httpClient.prepareRequest(request);
+ }
+
+ @Override
+ public String clientName() {
+ return httpClient.clientName();
+ }
+
+ @Override
+ public void close() {
+ HttpClientCache.instance().releaseClient(clientKey);
+ }
+ }
+}
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
b/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
index ff8dafcf06..273baa6748 100644
---
a/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
+++
b/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
@@ -21,13 +21,16 @@ package org.apache.iceberg.aws;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
-import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.ProxyConfiguration;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
-class UrlConnectionHttpClientConfigurations {
+class UrlConnectionHttpClientConfigurations extends
BaseHttpClientConfigurations {
private Long httpClientUrlConnectionConnectionTimeoutMs;
private Long httpClientUrlConnectionSocketTimeoutMs;
@@ -35,11 +38,12 @@ class UrlConnectionHttpClientConfigurations {
private UrlConnectionHttpClientConfigurations() {}
- public <T extends AwsSyncClientBuilder> void configureHttpClientBuilder(T
awsClientBuilder) {
- UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder =
+ @Override
+ protected SdkHttpClient buildHttpClient() {
+ final UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder =
UrlConnectionHttpClient.builder();
configureUrlConnectionHttpClientBuilder(urlConnectionHttpClientBuilder);
- awsClientBuilder.httpClientBuilder(urlConnectionHttpClientBuilder);
+ return urlConnectionHttpClientBuilder.build();
}
private void initialize(Map<String, String> httpClientProperties) {
@@ -71,6 +75,24 @@ class UrlConnectionHttpClientConfigurations {
}
}
+ /**
+ * Generate a cache key based on HTTP client configuration. This ensures
clients with identical
+ * configurations share the same HTTP client instance.
+ */
+ @Override
+ protected String generateHttpClientCacheKey() {
+ Map<String, Object> keyComponents = Maps.newTreeMap(); // TreeMap for
consistent ordering
+
+ keyComponents.put("type", "urlconnection");
+ keyComponents.put("connectionTimeoutMs",
httpClientUrlConnectionConnectionTimeoutMs);
+ keyComponents.put("socketTimeoutMs",
httpClientUrlConnectionSocketTimeoutMs);
+ keyComponents.put("proxyEndpoint", proxyEndpoint);
+
+ return keyComponents.entrySet().stream()
+ .map(entry -> entry.getKey() + "=" +
Objects.toString(entry.getValue(), "null"))
+ .collect(Collectors.joining(",", "urlconnection[", "]"));
+ }
+
public static UrlConnectionHttpClientConfigurations create(
Map<String, String> httpClientProperties) {
UrlConnectionHttpClientConfigurations configurations =
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java
b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java
new file mode 100644
index 0000000000..9febf37cbd
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.aws.HttpClientCache.ManagedHttpClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+public class TestHttpClientCache {
+
+ @Mock private SdkHttpClient httpClient1;
+ @Mock private SdkHttpClient httpClient2;
+ @Mock private Supplier<SdkHttpClient> httpClientFactory1;
+ @Mock private Supplier<SdkHttpClient> httpClientFactory2;
+
+ private HttpClientCache cache;
+
+ @BeforeEach
+ public void before() {
+ MockitoAnnotations.openMocks(this);
+ cache = HttpClientCache.instance();
+ // Clean up any existing clients from previous tests
+ cache.clear();
+
+ when(httpClientFactory1.get()).thenReturn(httpClient1);
+ when(httpClientFactory2.get()).thenReturn(httpClient2);
+ }
+
+ @Test
+ public void singletonPattern() {
+ HttpClientCache instance1 = HttpClientCache.instance();
+ HttpClientCache instance2 = HttpClientCache.instance();
+
+ assertThat(instance1).isSameAs(instance2);
+ }
+
+ @Test
+ public void clientCaching() {
+ final String cacheKey = "test-key";
+
+ // First call should create client and increment ref count
+ SdkHttpClient client1 = cache.getOrCreateClient(cacheKey,
httpClientFactory1);
+ verify(httpClientFactory1, times(1)).get();
+
+ // Second call with same key should return cached client and increment ref
count again
+ SdkHttpClient client2 = cache.getOrCreateClient(cacheKey,
httpClientFactory1);
+ verify(httpClientFactory1, times(1)).get(); // Factory should not be
called again
+
+ assertThat(client1).isSameAs(client2);
+
+ // Verify reference count is 2
+ ManagedHttpClient managedClient = cache.clients().get(cacheKey);
+ assertThat(managedClient.refCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void differentKeysCreateDifferentClients() {
+ SdkHttpClient client1 = cache.getOrCreateClient("test-key-1",
httpClientFactory1);
+ SdkHttpClient client2 = cache.getOrCreateClient("test-key-2",
httpClientFactory2);
+
+ verify(httpClientFactory1, times(1)).get();
+ verify(httpClientFactory2, times(1)).get();
+
+ assertThat(client1).isNotSameAs(client2);
+ }
+
+ @Test
+ public void referenceCountingAndCleanup() throws Exception {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient,
cacheKey);
+
+ // Acquire twice
+ ManagedHttpClient client1 = managedClient.acquire();
+ ManagedHttpClient client2 = managedClient.acquire();
+
+ assertThat(client1).isSameAs(client2);
+ assertThat(managedClient.refCount()).isEqualTo(2);
+
+ // First release should not close
+ managedClient.release();
+ assertThat(managedClient.refCount()).isEqualTo(1);
+ assertThat(managedClient.isClosed()).isFalse();
+ verify(mockClient, times(0)).close();
+
+ // Second release should close
+ managedClient.release();
+ assertThat(managedClient.refCount()).isEqualTo(0);
+ assertThat(managedClient.isClosed()).isTrue();
+ verify(mockClient, times(1)).close();
+ }
+
+ @Test
+ public void acquireAfterCloseThrows() {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient,
cacheKey);
+
+ // Acquire and release to close
+ managedClient.acquire();
+ managedClient.release();
+
+ assertThat(managedClient.isClosed()).isTrue();
+
+ // Trying to acquire a closed client should throw
+ assertThatThrownBy(managedClient::acquire)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Cannot acquire closed HTTP client");
+ }
+
+ @Test
+ public void releaseRemovesFromRegistry() {
+ final String cacheKey = "test-key";
+
+ // Create client (refCount = 1)
+ SdkHttpClient client1 = cache.getOrCreateClient(cacheKey,
httpClientFactory1);
+ assertThat(client1).isNotNull();
+
+ Map<String, ManagedHttpClient> clients = cache.clients();
+ assertThat(clients).containsKey(cacheKey);
+
+ // Verify ref count is 1
+ assertThat(clients.get(cacheKey).refCount()).isEqualTo(1);
+
+ // Release (refCount = 0, should close and remove)
+ cache.releaseClient(cacheKey);
+
+ // Client should be removed from map after close
+ assertThat(clients).doesNotContainKey(cacheKey);
+ verify(httpClient1, times(1)).close();
+ }
+
+ @Test
+ public void concurrentAccess() throws InterruptedException {
+ final String cacheKey = "concurrent-test-key";
+ int threadCount = 10;
+ Thread[] threads = new Thread[threadCount];
+ SdkHttpClient[] results = new SdkHttpClient[threadCount];
+
+ // Create multiple threads that access the same cache key
+ for (int i = 0; i < threadCount; i++) {
+ final int index = i;
+ threads[i] =
+ new Thread(() -> results[index] = cache.getOrCreateClient(cacheKey,
httpClientFactory1));
+ }
+
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // Wait for all threads to complete
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // Verify factory was called only once (proper caching under concurrency)
+ verify(httpClientFactory1, times(1)).get();
+
+ // Verify all threads got the same client instance
+ SdkHttpClient expectedClient = results[0];
+ for (int i = 1; i < threadCount; i++) {
+ assertThat(results[i]).isSameAs(expectedClient);
+ }
+
+ // Verify reference count equals number of threads
+ ManagedHttpClient managedClient = cache.clients().get(cacheKey);
+ assertThat(managedClient.refCount()).isEqualTo(threadCount);
+ }
+
+ @Test
+ public void registryClear() {
+ Map<String, ManagedHttpClient> clients = cache.clients();
+
+ // Create some clients
+ cache.getOrCreateClient("key1", httpClientFactory1);
+ cache.getOrCreateClient("key2", httpClientFactory2);
+
+ // Verify clients were stored
+ assertThat(clients).hasSize(2);
+
+ // Shutdown should clean up the map
+ cache.clear();
+
+ // Map should be empty after shutdown
+ assertThat(clients).isEmpty();
+
+ // Both clients should be closed
+ verify(httpClient1, times(1)).close();
+ verify(httpClient2, times(1)).close();
+ }
+
+ @Test
+ public void doubleReleaseDoesNotCauseNegativeRefCount() throws Exception {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient,
cacheKey);
+
+ // Acquire once
+ ManagedHttpClient client = managedClient.acquire();
+ assertThat(managedClient.refCount()).isEqualTo(1);
+
+ // First release should close the client (refCount goes to 0)
+ boolean closed = managedClient.release();
+ assertThat(closed).isTrue();
+ assertThat(managedClient.refCount()).isEqualTo(0);
+ assertThat(managedClient.isClosed()).isTrue();
+ verify(mockClient, times(1)).close();
+
+ // Second release on already closed client should be a no-op
+ // The closed flag prevents decrement, so refCount stays at 0
+ boolean closedAgain = managedClient.release();
+ assertThat(closedAgain).isFalse();
+ assertThat(managedClient.refCount()).isEqualTo(0); // Should still be 0,
not negative
+ assertThat(managedClient.isClosed()).isTrue();
+ verify(mockClient, times(1)).close(); // Close should not be called again
+ }
+
+ @Test
+ public void multipleReleasesAfterClose() throws Exception {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient,
cacheKey);
+
+ // Acquire once
+ managedClient.acquire();
+ assertThat(managedClient.refCount()).isEqualTo(1);
+
+ // Release to close
+ managedClient.release();
+ assertThat(managedClient.isClosed()).isTrue();
+ assertThat(managedClient.refCount()).isEqualTo(0);
+
+ // Try releasing multiple more times (simulating a bug in caller code)
+ for (int i = 0; i < 5; i++) {
+ boolean result = managedClient.release();
+ assertThat(result).isFalse(); // Should return false, not try to close
again
+ assertThat(managedClient.refCount()).isEqualTo(0); // RefCount should
never go negative
+ }
+
+ // Close should only have been called once
+ verify(mockClient, times(1)).close();
+ }
+}
diff --git
a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
index b0602a0749..378e5e6ca9 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
@@ -20,8 +20,11 @@ package org.apache.iceberg.aws;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
import java.util.Map;
+import org.apache.iceberg.aws.HttpClientCache.ManagedHttpClient;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -40,16 +43,21 @@ public class TestHttpClientProperties {
properties.put(HttpClientProperties.CLIENT_TYPE, "urlconnection");
HttpClientProperties httpProperties = new HttpClientProperties(properties);
S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
- ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
- ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+ ArgumentCaptor<SdkHttpClient> httpClientCaptor =
ArgumentCaptor.forClass(SdkHttpClient.class);
httpProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
-
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
- SdkHttpClient.Builder capturedHttpClientBuilder =
httpClientBuilderCaptor.getValue();
+ Mockito.verify(mockS3ClientBuilder).httpClient(httpClientCaptor.capture());
+ SdkHttpClient capturedHttpClient = httpClientCaptor.getValue();
- assertThat(capturedHttpClientBuilder)
- .as("Should use url connection http client")
- .isInstanceOf(UrlConnectionHttpClient.Builder.class);
+ assertThat(capturedHttpClient)
+ .as("Should use managed SDK http client")
+ .isInstanceOf(ManagedHttpClient.class);
+
+ // Verify the underlying delegate is UrlConnectionHttpClient
+ ManagedHttpClient managedClient = (ManagedHttpClient) capturedHttpClient;
+ assertThat(managedClient.httpClient())
+ .as("Underlying client should be UrlConnectionHttpClient")
+ .isInstanceOf(UrlConnectionHttpClient.class);
}
@Test
@@ -58,15 +66,21 @@ public class TestHttpClientProperties {
properties.put(HttpClientProperties.CLIENT_TYPE, "apache");
HttpClientProperties httpProperties = new HttpClientProperties(properties);
S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
- ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
- ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+ ArgumentCaptor<SdkHttpClient> httpClientCaptor =
ArgumentCaptor.forClass(SdkHttpClient.class);
httpProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
-
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
- SdkHttpClient.Builder capturedHttpClientBuilder =
httpClientBuilderCaptor.getValue();
- assertThat(capturedHttpClientBuilder)
- .as("Should use apache http client")
- .isInstanceOf(ApacheHttpClient.Builder.class);
+ Mockito.verify(mockS3ClientBuilder).httpClient(httpClientCaptor.capture());
+ SdkHttpClient capturedHttpClient = httpClientCaptor.getValue();
+
+ assertThat(capturedHttpClient)
+ .as("Should use managed SDK http client")
+ .isInstanceOf(ManagedHttpClient.class);
+
+ // Verify the underlying delegate is ApacheHttpClient
+ ManagedHttpClient managedClient = (ManagedHttpClient) capturedHttpClient;
+ assertThat(managedClient.httpClient())
+ .as("Underlying client should be ApacheHttpClient")
+ .isInstanceOf(ApacheHttpClient.class);
}
@Test
@@ -80,4 +94,29 @@ public class TestHttpClientProperties {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unrecognized HTTP client type test");
}
+
+ @Test
+ public void testApacheHttpClientConfiguredAsSharedResource() {
+ Map<String, String> properties = Maps.newHashMap();
+ ApacheHttpClientConfigurations apacheConfig =
ApacheHttpClientConfigurations.create(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+
+ apacheConfig.configureHttpClientBuilder(mockS3ClientBuilder);
+
+ // Verify that httpClient() is called with a managed client (as a shared
resource)
+ verify(mockS3ClientBuilder).httpClient(any(ManagedHttpClient.class));
+ }
+
+ @Test
+ public void testUrlConnectionHttpClientConfiguredAsSharedResource() {
+ Map<String, String> properties = Maps.newHashMap();
+ UrlConnectionHttpClientConfigurations urlConfig =
+ UrlConnectionHttpClientConfigurations.create(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+
+ urlConfig.configureHttpClientBuilder(mockS3ClientBuilder);
+
+ // Verify that httpClient() is called with a managed client (as a shared
resource)
+ verify(mockS3ClientBuilder).httpClient(any(ManagedHttpClient.class));
+ }
}