This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new 4d9320df36 AWS: Configure builder for reuse of http connection pool in 
SDKv2 (#14161) (#14774)
4d9320df36 is described below

commit 4d9320df367e371b81fee3a7f7db4f7980bcf832
Author: Anurag Mantripragada <[email protected]>
AuthorDate: Fri Dec 5 17:27:01 2025 -0800

    AWS: Configure builder for reuse of http connection pool in SDKv2 (#14161) 
(#14774)
    
    AWS: Configure builder for reuse of http clients in SDKv2
---
 .../aws/ApacheHttpClientConfigurations.java        |  39 ++-
 .../iceberg/aws/BaseHttpClientConfigurations.java  |  75 ++++++
 .../org/apache/iceberg/aws/HttpClientCache.java    | 203 +++++++++++++++
 .../aws/UrlConnectionHttpClientConfigurations.java |  32 ++-
 .../apache/iceberg/aws/TestHttpClientCache.java    | 276 +++++++++++++++++++++
 .../iceberg/aws/TestHttpClientProperties.java      |  67 +++--
 6 files changed, 668 insertions(+), 24 deletions(-)

diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java 
b/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
index 95fe34b742..3445928d15 100644
--- 
a/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
+++ 
b/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
@@ -21,13 +21,16 @@ package org.apache.iceberg.aws;
 import java.net.URI;
 import java.time.Duration;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.PropertyUtil;
-import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.http.apache.ProxyConfiguration;
 
-class ApacheHttpClientConfigurations {
+class ApacheHttpClientConfigurations extends BaseHttpClientConfigurations {
   private Long connectionTimeoutMs;
   private Long socketTimeoutMs;
   private Long acquisitionTimeoutMs;
@@ -41,10 +44,11 @@ class ApacheHttpClientConfigurations {
 
   private ApacheHttpClientConfigurations() {}
 
-  public <T extends AwsSyncClientBuilder> void configureHttpClientBuilder(T 
awsClientBuilder) {
-    ApacheHttpClient.Builder apacheHttpClientBuilder = 
ApacheHttpClient.builder();
+  @Override
+  protected SdkHttpClient buildHttpClient() {
+    final ApacheHttpClient.Builder apacheHttpClientBuilder = 
ApacheHttpClient.builder();
     configureApacheHttpClientBuilder(apacheHttpClientBuilder);
-    awsClientBuilder.httpClientBuilder(apacheHttpClientBuilder);
+    return apacheHttpClientBuilder.build();
   }
 
   private void initialize(Map<String, String> httpClientProperties) {
@@ -115,6 +119,31 @@ class ApacheHttpClientConfigurations {
     }
   }
 
+  /**
+   * Generate a cache key based on HTTP client configuration. This ensures 
clients with identical
+   * configurations share the same HTTP client instance.
+   */
+  @Override
+  protected String generateHttpClientCacheKey() {
+    Map<String, Object> keyComponents = Maps.newTreeMap();
+
+    keyComponents.put("type", "apache");
+    keyComponents.put("connectionTimeoutMs", connectionTimeoutMs);
+    keyComponents.put("socketTimeoutMs", socketTimeoutMs);
+    keyComponents.put("acquisitionTimeoutMs", acquisitionTimeoutMs);
+    keyComponents.put("connectionMaxIdleTimeMs", connectionMaxIdleTimeMs);
+    keyComponents.put("connectionTimeToLiveMs", connectionTimeToLiveMs);
+    keyComponents.put("expectContinueEnabled", expectContinueEnabled);
+    keyComponents.put("maxConnections", maxConnections);
+    keyComponents.put("tcpKeepAliveEnabled", tcpKeepAliveEnabled);
+    keyComponents.put("useIdleConnectionReaperEnabled", 
useIdleConnectionReaperEnabled);
+    keyComponents.put("proxyEndpoint", proxyEndpoint);
+
+    return keyComponents.entrySet().stream()
+        .map(entry -> entry.getKey() + "=" + 
Objects.toString(entry.getValue(), "null"))
+        .collect(Collectors.joining(",", "apache[", "]"));
+  }
+
   public static ApacheHttpClientConfigurations create(Map<String, String> 
properties) {
     ApacheHttpClientConfigurations configurations = new 
ApacheHttpClientConfigurations();
     configurations.initialize(properties);
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java 
b/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java
new file mode 100644
index 0000000000..d4301f0448
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+/**
+ * Base class for HTTP client configurations that provides managed HTTP client 
lifecycle with
+ * reference counting.
+ *
+ * <p>This class encapsulates the interaction with {@link HttpClientCache} to 
ensure HTTP clients
+ * are properly shared and their lifecycle managed via reference counting. 
Subclasses are
+ * responsible for providing configuration-specific cache keys and building 
the appropriate HTTP
+ * client type (Apache, UrlConnection, etc.).
+ */
+abstract class BaseHttpClientConfigurations {
+
+  private static final HttpClientCache CACHE = HttpClientCache.instance();
+
+  /**
+   * Generate a unique cache key based on the HTTP client configuration. The 
cache key is used to
+   * determine whether HTTP clients can be shared across different factory 
instances.
+   *
+   * <p>Implementations should include all configuration parameters that 
affect HTTP client behavior
+   * (timeouts, connection settings, proxy configuration, etc.) to ensure 
clients are only shared
+   * when they have identical configurations.
+   *
+   * @return a unique string representing this HTTP client configuration
+   */
+  protected abstract String generateHttpClientCacheKey();
+
+  /**
+   * Build the actual HTTP client instance based on the configuration. This 
method is called only
+   * when a new HTTP client needs to be created (i.e., when no cached client 
exists for the given
+   * cache key).
+   *
+   * @return a configured {@link SdkHttpClient} instance
+   */
+  protected abstract SdkHttpClient buildHttpClient();
+
+  /**
+   * Configure the AWS client builder with a managed HTTP client.
+   *
+   * <p>This method obtains a managed HTTP client from the cache using the 
configuration-specific
+   * cache key. If a client with the same configuration already exists in the 
cache, it will be
+   * reused with an incremented reference count. Otherwise, a new client will 
be built and cached.
+   *
+   * @param awsClientBuilder the AWS client builder to configure
+   * @param <T> the type of AWS client builder
+   */
+  public <T extends AwsSyncClientBuilder> void configureHttpClientBuilder(T 
awsClientBuilder) {
+    String cacheKey = generateHttpClientCacheKey();
+
+    SdkHttpClient managedHttpClient = CACHE.getOrCreateClient(cacheKey, 
this::buildHttpClient);
+
+    awsClientBuilder.httpClient(managedHttpClient);
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java 
b/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java
new file mode 100644
index 0000000000..79444a62ae
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.ExecutableHttpRequest;
+import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+/**
+ * A cache that manages the lifecycle of shared HTTP clients for AWS SDK v2 
using reference
+ * counting. Package-private - only accessed via {@link 
BaseHttpClientConfigurations}.
+ */
+final class HttpClientCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HttpClientCache.class);
+
+  private final ConcurrentMap<String, ManagedHttpClient> clients = 
Maps.newConcurrentMap();
+  private static volatile HttpClientCache instance;
+
+  static HttpClientCache instance() {
+    if (instance == null) {
+      synchronized (HttpClientCache.class) {
+        if (instance == null) {
+          instance = new HttpClientCache();
+        }
+      }
+    }
+    return instance;
+  }
+
+  /**
+   * Get or create a managed HTTP client for the given configuration. Each 
call increments the
+   * reference count for the client and returns a ref-counted wrapper.
+   *
+   * @param clientKey unique key identifying the client configuration
+   * @param clientFactory factory to create the HTTP client if not cached
+   * @return a ref-counted HTTP client wrapper
+   */
+  SdkHttpClient getOrCreateClient(String clientKey, Supplier<SdkHttpClient> 
clientFactory) {
+    ManagedHttpClient managedClient =
+        clients.computeIfAbsent(
+            clientKey,
+            key -> {
+              LOG.debug("Creating new managed HTTP client for key: {}", key);
+              SdkHttpClient httpClient = clientFactory.get();
+              return new ManagedHttpClient(httpClient, key);
+            });
+    // Return the cached ref-counted wrapper
+    return managedClient.acquire();
+  }
+
+  /**
+   * Release a reference to the HTTP client. When the reference count reaches 
zero, the client is
+   * closed and removed from the cache.
+   *
+   * @param clientKey the key identifying the client to release
+   */
+  void releaseClient(String clientKey) {
+    ManagedHttpClient managedClient = clients.get(clientKey);
+    if (null != managedClient && managedClient.release()) {
+      clients.remove(clientKey, managedClient);
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, ManagedHttpClient> clients() {
+    return Collections.unmodifiableMap(clients);
+  }
+
+  @VisibleForTesting
+  void clear() {
+    clients.values().forEach(ManagedHttpClient::close);
+    clients.clear();
+  }
+
+  /**
+   * Managed HTTP client wrapper that provides reference counting for 
lifecycle management. The HTTP
+   * client is closed when the reference count reaches zero.
+   */
+  static class ManagedHttpClient implements SdkHttpClient {
+    private final SdkHttpClient httpClient;
+    private final String clientKey;
+    private volatile int refCount = 0;
+    private boolean closed = false;
+
+    ManagedHttpClient(SdkHttpClient httpClient, String clientKey) {
+      this.httpClient = httpClient;
+      this.clientKey = clientKey;
+      LOG.debug("Created managed HTTP client: key={}", clientKey);
+    }
+
+    /**
+     * Acquire a reference to the HTTP client, incrementing the reference 
count.
+     *
+     * @return the ref-counted wrapper client
+     * @throws IllegalStateException if the client has already been closed
+     */
+    synchronized ManagedHttpClient acquire() {
+      if (closed) {
+        throw new IllegalStateException("Cannot acquire closed HTTP client: " 
+ clientKey);
+      }
+      refCount++;
+      LOG.debug("Acquired HTTP client: key={}, refCount={}", clientKey, 
refCount);
+      return this;
+    }
+
+    /**
+     * Release a reference to the HTTP client, decrementing the reference 
count. If the count
+     * reaches zero, the client is closed.
+     *
+     * @return true if the client was closed, false otherwise
+     */
+    synchronized boolean release() {
+      if (closed) {
+        LOG.warn("Attempted to release already closed HTTP client: key={}", 
clientKey);
+        return false;
+      }
+
+      refCount--;
+      LOG.debug("Released HTTP client: key={}, refCount={}", clientKey, 
refCount);
+      if (refCount == 0) {
+        return closeHttpClient();
+      } else if (refCount < 0) {
+        LOG.warn(
+            "HTTP client reference count went negative key={}, refCount={}", 
clientKey, refCount);
+        refCount = 0;
+      }
+      return false;
+    }
+
+    @VisibleForTesting
+    SdkHttpClient httpClient() {
+      return httpClient;
+    }
+
+    /**
+     * Close the HTTP client if not already closed.
+     *
+     * @return true if the client was closed by this call, false if already 
closed
+     */
+    private boolean closeHttpClient() {
+      if (!closed) {
+        closed = true;
+        LOG.debug("Closing HTTP client: key={}", clientKey);
+        try {
+          httpClient.close();
+        } catch (Exception e) {
+          LOG.error("Failed to close HTTP client: key={}", clientKey, e);
+        }
+        return true;
+      }
+      return false;
+    }
+
+    @VisibleForTesting
+    int refCount() {
+      return refCount;
+    }
+
+    @VisibleForTesting
+    boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
+      return httpClient.prepareRequest(request);
+    }
+
+    @Override
+    public String clientName() {
+      return httpClient.clientName();
+    }
+
+    @Override
+    public void close() {
+      HttpClientCache.instance().releaseClient(clientKey);
+    }
+  }
+}
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
 
b/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
index ff8dafcf06..273baa6748 100644
--- 
a/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
+++ 
b/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
@@ -21,13 +21,16 @@ package org.apache.iceberg.aws;
 import java.net.URI;
 import java.time.Duration;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.PropertyUtil;
-import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
 import software.amazon.awssdk.http.urlconnection.ProxyConfiguration;
 import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
 
-class UrlConnectionHttpClientConfigurations {
+class UrlConnectionHttpClientConfigurations extends 
BaseHttpClientConfigurations {
 
   private Long httpClientUrlConnectionConnectionTimeoutMs;
   private Long httpClientUrlConnectionSocketTimeoutMs;
@@ -35,11 +38,12 @@ class UrlConnectionHttpClientConfigurations {
 
   private UrlConnectionHttpClientConfigurations() {}
 
-  public <T extends AwsSyncClientBuilder> void configureHttpClientBuilder(T 
awsClientBuilder) {
-    UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder =
+  @Override
+  protected SdkHttpClient buildHttpClient() {
+    final UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder =
         UrlConnectionHttpClient.builder();
     configureUrlConnectionHttpClientBuilder(urlConnectionHttpClientBuilder);
-    awsClientBuilder.httpClientBuilder(urlConnectionHttpClientBuilder);
+    return urlConnectionHttpClientBuilder.build();
   }
 
   private void initialize(Map<String, String> httpClientProperties) {
@@ -71,6 +75,24 @@ class UrlConnectionHttpClientConfigurations {
     }
   }
 
+  /**
+   * Generate a cache key based on HTTP client configuration. This ensures 
clients with identical
+   * configurations share the same HTTP client instance.
+   */
+  @Override
+  protected String generateHttpClientCacheKey() {
+    Map<String, Object> keyComponents = Maps.newTreeMap(); // TreeMap for 
consistent ordering
+
+    keyComponents.put("type", "urlconnection");
+    keyComponents.put("connectionTimeoutMs", 
httpClientUrlConnectionConnectionTimeoutMs);
+    keyComponents.put("socketTimeoutMs", 
httpClientUrlConnectionSocketTimeoutMs);
+    keyComponents.put("proxyEndpoint", proxyEndpoint);
+
+    return keyComponents.entrySet().stream()
+        .map(entry -> entry.getKey() + "=" + 
Objects.toString(entry.getValue(), "null"))
+        .collect(Collectors.joining(",", "urlconnection[", "]"));
+  }
+
   public static UrlConnectionHttpClientConfigurations create(
       Map<String, String> httpClientProperties) {
     UrlConnectionHttpClientConfigurations configurations =
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java 
b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java
new file mode 100644
index 0000000000..9febf37cbd
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.aws.HttpClientCache.ManagedHttpClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+public class TestHttpClientCache {
+
+  @Mock private SdkHttpClient httpClient1;
+  @Mock private SdkHttpClient httpClient2;
+  @Mock private Supplier<SdkHttpClient> httpClientFactory1;
+  @Mock private Supplier<SdkHttpClient> httpClientFactory2;
+
+  private HttpClientCache cache;
+
+  @BeforeEach
+  public void before() {
+    MockitoAnnotations.openMocks(this);
+    cache = HttpClientCache.instance();
+    // Clean up any existing clients from previous tests
+    cache.clear();
+
+    when(httpClientFactory1.get()).thenReturn(httpClient1);
+    when(httpClientFactory2.get()).thenReturn(httpClient2);
+  }
+
+  @Test
+  public void singletonPattern() {
+    HttpClientCache instance1 = HttpClientCache.instance();
+    HttpClientCache instance2 = HttpClientCache.instance();
+
+    assertThat(instance1).isSameAs(instance2);
+  }
+
+  @Test
+  public void clientCaching() {
+    final String cacheKey = "test-key";
+
+    // First call should create client and increment ref count
+    SdkHttpClient client1 = cache.getOrCreateClient(cacheKey, 
httpClientFactory1);
+    verify(httpClientFactory1, times(1)).get();
+
+    // Second call with same key should return cached client and increment ref 
count again
+    SdkHttpClient client2 = cache.getOrCreateClient(cacheKey, 
httpClientFactory1);
+    verify(httpClientFactory1, times(1)).get(); // Factory should not be 
called again
+
+    assertThat(client1).isSameAs(client2);
+
+    // Verify reference count is 2
+    ManagedHttpClient managedClient = cache.clients().get(cacheKey);
+    assertThat(managedClient.refCount()).isEqualTo(2);
+  }
+
+  @Test
+  public void differentKeysCreateDifferentClients() {
+    SdkHttpClient client1 = cache.getOrCreateClient("test-key-1", 
httpClientFactory1);
+    SdkHttpClient client2 = cache.getOrCreateClient("test-key-2", 
httpClientFactory2);
+
+    verify(httpClientFactory1, times(1)).get();
+    verify(httpClientFactory2, times(1)).get();
+
+    assertThat(client1).isNotSameAs(client2);
+  }
+
+  @Test
+  public void referenceCountingAndCleanup() throws Exception {
+    SdkHttpClient mockClient = mock(SdkHttpClient.class);
+    final String cacheKey = "test-key";
+
+    ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, 
cacheKey);
+
+    // Acquire twice
+    ManagedHttpClient client1 = managedClient.acquire();
+    ManagedHttpClient client2 = managedClient.acquire();
+
+    assertThat(client1).isSameAs(client2);
+    assertThat(managedClient.refCount()).isEqualTo(2);
+
+    // First release should not close
+    managedClient.release();
+    assertThat(managedClient.refCount()).isEqualTo(1);
+    assertThat(managedClient.isClosed()).isFalse();
+    verify(mockClient, times(0)).close();
+
+    // Second release should close
+    managedClient.release();
+    assertThat(managedClient.refCount()).isEqualTo(0);
+    assertThat(managedClient.isClosed()).isTrue();
+    verify(mockClient, times(1)).close();
+  }
+
+  @Test
+  public void acquireAfterCloseThrows() {
+    SdkHttpClient mockClient = mock(SdkHttpClient.class);
+    final String cacheKey = "test-key";
+
+    ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, 
cacheKey);
+
+    // Acquire and release to close
+    managedClient.acquire();
+    managedClient.release();
+
+    assertThat(managedClient.isClosed()).isTrue();
+
+    // Trying to acquire a closed client should throw
+    assertThatThrownBy(managedClient::acquire)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("Cannot acquire closed HTTP client");
+  }
+
+  @Test
+  public void releaseRemovesFromRegistry() {
+    final String cacheKey = "test-key";
+
+    // Create client (refCount = 1)
+    SdkHttpClient client1 = cache.getOrCreateClient(cacheKey, 
httpClientFactory1);
+    assertThat(client1).isNotNull();
+
+    Map<String, ManagedHttpClient> clients = cache.clients();
+    assertThat(clients).containsKey(cacheKey);
+
+    // Verify ref count is 1
+    assertThat(clients.get(cacheKey).refCount()).isEqualTo(1);
+
+    // Release (refCount = 0, should close and remove)
+    cache.releaseClient(cacheKey);
+
+    // Client should be removed from map after close
+    assertThat(clients).doesNotContainKey(cacheKey);
+    verify(httpClient1, times(1)).close();
+  }
+
+  @Test
+  public void concurrentAccess() throws InterruptedException {
+    final String cacheKey = "concurrent-test-key";
+    int threadCount = 10;
+    Thread[] threads = new Thread[threadCount];
+    SdkHttpClient[] results = new SdkHttpClient[threadCount];
+
+    // Create multiple threads that access the same cache key
+    for (int i = 0; i < threadCount; i++) {
+      final int index = i;
+      threads[i] =
+          new Thread(() -> results[index] = cache.getOrCreateClient(cacheKey, 
httpClientFactory1));
+    }
+
+    // Start all threads
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    // Wait for all threads to complete
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    // Verify factory was called only once (proper caching under concurrency)
+    verify(httpClientFactory1, times(1)).get();
+
+    // Verify all threads got the same client instance
+    SdkHttpClient expectedClient = results[0];
+    for (int i = 1; i < threadCount; i++) {
+      assertThat(results[i]).isSameAs(expectedClient);
+    }
+
+    // Verify reference count equals number of threads
+    ManagedHttpClient managedClient = cache.clients().get(cacheKey);
+    assertThat(managedClient.refCount()).isEqualTo(threadCount);
+  }
+
+  @Test
+  public void registryClear() {
+    Map<String, ManagedHttpClient> clients = cache.clients();
+
+    // Create some clients
+    cache.getOrCreateClient("key1", httpClientFactory1);
+    cache.getOrCreateClient("key2", httpClientFactory2);
+
+    // Verify clients were stored
+    assertThat(clients).hasSize(2);
+
+    // Shutdown should clean up the map
+    cache.clear();
+
+    // Map should be empty after shutdown
+    assertThat(clients).isEmpty();
+
+    // Both clients should be closed
+    verify(httpClient1, times(1)).close();
+    verify(httpClient2, times(1)).close();
+  }
+
+  @Test
+  public void doubleReleaseDoesNotCauseNegativeRefCount() throws Exception {
+    SdkHttpClient mockClient = mock(SdkHttpClient.class);
+    final String cacheKey = "test-key";
+
+    ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, 
cacheKey);
+
+    // Acquire once
+    ManagedHttpClient client = managedClient.acquire();
+    assertThat(managedClient.refCount()).isEqualTo(1);
+
+    // First release should close the client (refCount goes to 0)
+    boolean closed = managedClient.release();
+    assertThat(closed).isTrue();
+    assertThat(managedClient.refCount()).isEqualTo(0);
+    assertThat(managedClient.isClosed()).isTrue();
+    verify(mockClient, times(1)).close();
+
+    // Second release on already closed client should be a no-op
+    // The closed flag prevents decrement, so refCount stays at 0
+    boolean closedAgain = managedClient.release();
+    assertThat(closedAgain).isFalse();
+    assertThat(managedClient.refCount()).isEqualTo(0); // Should still be 0, 
not negative
+    assertThat(managedClient.isClosed()).isTrue();
+    verify(mockClient, times(1)).close(); // Close should not be called again
+  }
+
+  @Test
+  public void multipleReleasesAfterClose() throws Exception {
+    SdkHttpClient mockClient = mock(SdkHttpClient.class);
+    final String cacheKey = "test-key";
+
+    ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, 
cacheKey);
+
+    // Acquire once
+    managedClient.acquire();
+    assertThat(managedClient.refCount()).isEqualTo(1);
+
+    // Release to close
+    managedClient.release();
+    assertThat(managedClient.isClosed()).isTrue();
+    assertThat(managedClient.refCount()).isEqualTo(0);
+
+    // Try releasing multiple more times (simulating a bug in caller code)
+    for (int i = 0; i < 5; i++) {
+      boolean result = managedClient.release();
+      assertThat(result).isFalse(); // Should return false, not try to close 
again
+      assertThat(managedClient.refCount()).isEqualTo(0); // RefCount should 
never go negative
+    }
+
+    // Close should only have been called once
+    verify(mockClient, times(1)).close();
+  }
+}
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java 
b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
index b0602a0749..378e5e6ca9 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
@@ -20,8 +20,11 @@ package org.apache.iceberg.aws;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
 import java.util.Map;
+import org.apache.iceberg.aws.HttpClientCache.ManagedHttpClient;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
@@ -40,16 +43,21 @@ public class TestHttpClientProperties {
     properties.put(HttpClientProperties.CLIENT_TYPE, "urlconnection");
     HttpClientProperties httpProperties = new HttpClientProperties(properties);
     S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
-    ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
-        ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+    ArgumentCaptor<SdkHttpClient> httpClientCaptor = 
ArgumentCaptor.forClass(SdkHttpClient.class);
 
     httpProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
-    
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
-    SdkHttpClient.Builder capturedHttpClientBuilder = 
httpClientBuilderCaptor.getValue();
+    Mockito.verify(mockS3ClientBuilder).httpClient(httpClientCaptor.capture());
+    SdkHttpClient capturedHttpClient = httpClientCaptor.getValue();
 
-    assertThat(capturedHttpClientBuilder)
-        .as("Should use url connection http client")
-        .isInstanceOf(UrlConnectionHttpClient.Builder.class);
+    assertThat(capturedHttpClient)
+        .as("Should use managed SDK http client")
+        .isInstanceOf(ManagedHttpClient.class);
+
+    // Verify the underlying delegate is UrlConnectionHttpClient
+    ManagedHttpClient managedClient = (ManagedHttpClient) capturedHttpClient;
+    assertThat(managedClient.httpClient())
+        .as("Underlying client should be UrlConnectionHttpClient")
+        .isInstanceOf(UrlConnectionHttpClient.class);
   }
 
   @Test
@@ -58,15 +66,21 @@ public class TestHttpClientProperties {
     properties.put(HttpClientProperties.CLIENT_TYPE, "apache");
     HttpClientProperties httpProperties = new HttpClientProperties(properties);
     S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
-    ArgumentCaptor<SdkHttpClient.Builder> httpClientBuilderCaptor =
-        ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+    ArgumentCaptor<SdkHttpClient> httpClientCaptor = 
ArgumentCaptor.forClass(SdkHttpClient.class);
 
     httpProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
-    
Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
-    SdkHttpClient.Builder capturedHttpClientBuilder = 
httpClientBuilderCaptor.getValue();
-    assertThat(capturedHttpClientBuilder)
-        .as("Should use apache http client")
-        .isInstanceOf(ApacheHttpClient.Builder.class);
+    Mockito.verify(mockS3ClientBuilder).httpClient(httpClientCaptor.capture());
+    SdkHttpClient capturedHttpClient = httpClientCaptor.getValue();
+
+    assertThat(capturedHttpClient)
+        .as("Should use managed SDK http client")
+        .isInstanceOf(ManagedHttpClient.class);
+
+    // Verify the underlying delegate is ApacheHttpClient
+    ManagedHttpClient managedClient = (ManagedHttpClient) capturedHttpClient;
+    assertThat(managedClient.httpClient())
+        .as("Underlying client should be ApacheHttpClient")
+        .isInstanceOf(ApacheHttpClient.class);
   }
 
   @Test
@@ -80,4 +94,29 @@ public class TestHttpClientProperties {
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Unrecognized HTTP client type test");
   }
+
+  @Test
+  public void testApacheHttpClientConfiguredAsSharedResource() {
+    Map<String, String> properties = Maps.newHashMap();
+    ApacheHttpClientConfigurations apacheConfig = 
ApacheHttpClientConfigurations.create(properties);
+    S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+
+    apacheConfig.configureHttpClientBuilder(mockS3ClientBuilder);
+
+    // Verify that httpClient() is called with a managed client (as a shared 
resource)
+    verify(mockS3ClientBuilder).httpClient(any(ManagedHttpClient.class));
+  }
+
+  @Test
+  public void testUrlConnectionHttpClientConfiguredAsSharedResource() {
+    Map<String, String> properties = Maps.newHashMap();
+    UrlConnectionHttpClientConfigurations urlConfig =
+        UrlConnectionHttpClientConfigurations.create(properties);
+    S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+
+    urlConfig.configureHttpClientBuilder(mockS3ClientBuilder);
+
+    // Verify that httpClient() is called with a managed client (as a shared 
resource)
+    verify(mockS3ClientBuilder).httpClient(any(ManagedHttpClient.class));
+  }
 }

Reply via email to