This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b01c96594a [fix][client] Fix timeout handling in Pulsar Admin client 
(#23128)
3b01c96594a is described below

commit 3b01c96594ae1af215018b1e1df29e5416f240d9
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Aug 7 08:22:25 2024 +0300

    [fix][client] Fix timeout handling in Pulsar Admin client (#23128)
---
 .../pulsar/client/admin/internal/BaseResource.java |   8 +-
 .../pulsar/client/admin/internal/BookiesImpl.java  |   4 +-
 .../client/admin/internal/BrokerStatsImpl.java     |   4 +-
 .../pulsar/client/admin/internal/BrokersImpl.java  |   4 +-
 .../pulsar/client/admin/internal/ClustersImpl.java |   4 +-
 .../client/admin/internal/ComponentResource.java   |   4 +-
 .../client/admin/internal/FunctionsImpl.java       |   4 +-
 .../client/admin/internal/NamespacesImpl.java      |   4 +-
 .../admin/internal/NonPersistentTopicsImpl.java    |   4 +-
 .../pulsar/client/admin/internal/PackagesImpl.java |   4 +-
 .../client/admin/internal/ProxyStatsImpl.java      |   4 +-
 .../client/admin/internal/PulsarAdminImpl.java     |  46 +++----
 .../client/admin/internal/ResourceGroupsImpl.java  |   4 +-
 .../client/admin/internal/ResourceQuotasImpl.java  |   4 +-
 .../pulsar/client/admin/internal/SchemasImpl.java  |   4 +-
 .../pulsar/client/admin/internal/SinksImpl.java    |   4 +-
 .../pulsar/client/admin/internal/SourcesImpl.java  |   4 +-
 .../pulsar/client/admin/internal/TenantsImpl.java  |   4 +-
 .../pulsar/client/admin/internal/TopicsImpl.java   |   4 +-
 .../client/admin/internal/TransactionsImpl.java    |   4 +-
 .../pulsar/client/admin/internal/WorkerImpl.java   |   4 +-
 .../admin/internal/http/AsyncHttpConnector.java    |  37 ++++--
 .../internal/http/AsyncHttpConnectorTest.java      | 140 +++++++++++++++++++++
 pulsar-client-admin/src/test/resources/log4j2.xml  |  41 ++++++
 .../java/org/apache/pulsar/admin/cli/CmdBase.java  |  14 +--
 25 files changed, 280 insertions(+), 82 deletions(-)

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 22550666cb6..ea39053c2ce 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -62,11 +62,11 @@ public abstract class BaseResource {
     private static final Logger log = 
LoggerFactory.getLogger(BaseResource.class);
 
     protected final Authentication auth;
-    protected final long readTimeoutMs;
+    protected final long requestTimeoutMs;
 
-    protected BaseResource(Authentication auth, long readTimeoutMs) {
+    protected BaseResource(Authentication auth, long requestTimeoutMs) {
         this.auth = auth;
-        this.readTimeoutMs = readTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
     }
 
     public Builder request(final WebTarget target) throws PulsarAdminException 
{
@@ -339,7 +339,7 @@ public abstract class BaseResource {
 
     protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws 
PulsarAdminException {
         try {
-            return executor.get().get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+            return executor.get().get(this.requestTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
           throw new PulsarAdminException(e);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
index 2286fb8c8a3..0bf92e02677 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
@@ -32,8 +32,8 @@ import 
org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 public class BookiesImpl extends BaseResource implements Bookies {
     private final WebTarget adminBookies;
 
-    public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs) 
{
-        super(auth, readTimeoutMs);
+    public BookiesImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminBookies = web.path("/admin/v2/bookies");
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
index e409d6f4492..6ddabe9837e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
@@ -38,8 +38,8 @@ public class BrokerStatsImpl extends BaseResource implements 
BrokerStats {
     private final WebTarget adminBrokerStats;
     private final WebTarget adminV2BrokerStats;
 
-    public BrokerStatsImpl(WebTarget target, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public BrokerStatsImpl(WebTarget target, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminBrokerStats = target.path("/admin/broker-stats");
         adminV2BrokerStats = target.path("/admin/v2/broker-stats");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 7b4ebb1778d..b82c3fd0f41 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -37,8 +37,8 @@ import org.apache.pulsar.common.util.Codec;
 public class BrokersImpl extends BaseResource implements Brokers {
     private final WebTarget adminBrokers;
 
-    public BrokersImpl(WebTarget web, Authentication auth, long readTimeoutMs) 
{
-        super(auth, readTimeoutMs);
+    public BrokersImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminBrokers = web.path("admin/v2/brokers");
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 231d4506d61..24048ea3c0a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -47,8 +47,8 @@ public class ClustersImpl extends BaseResource implements 
Clusters {
 
     private final WebTarget adminClusters;
 
-    public ClustersImpl(WebTarget web, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public ClustersImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminClusters = web.path("/admin/v2/clusters");
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
index 8beecff3897..0301f0fc2ee 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
@@ -37,8 +37,8 @@ import org.asynchttpclient.RequestBuilder;
  */
 public class ComponentResource extends BaseResource {
 
-    protected ComponentResource(Authentication auth, long readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    protected ComponentResource(Authentication auth, long requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
     }
 
     public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder 
requestBuilder) throws PulsarAdminException {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index bb4cb0c1ef8..97c42e5c1a9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -72,8 +72,8 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
     private final WebTarget functions;
     private final AsyncHttpClient asyncHttpClient;
 
-    public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         this.functions = web.path("/admin/v3/functions");
         this.asyncHttpClient = asyncHttpClient;
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index c7492a26ab3..7d41c7203d2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -64,8 +64,8 @@ public class NamespacesImpl extends BaseResource implements 
Namespaces {
     private final WebTarget adminNamespaces;
     private final WebTarget adminV2Namespaces;
 
-    public NamespacesImpl(WebTarget web, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public NamespacesImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminNamespaces = web.path("/admin/namespaces");
         adminV2Namespaces = web.path("/admin/v2/namespaces");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index 76727cd1e0f..e98d44fdc4a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -38,8 +38,8 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     private final WebTarget adminNonPersistentTopics;
     private final WebTarget adminV2NonPersistentTopics;
 
-    public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminNonPersistentTopics = web.path("/admin");
         adminV2NonPersistentTopics = web.path("/admin/v2");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index 694c2160b0f..d69bef448c1 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -57,8 +57,8 @@ public class PackagesImpl extends ComponentResource 
implements Packages {
     private final WebTarget packages;
     private final AsyncHttpClient httpClient;
 
-    public PackagesImpl(WebTarget webTarget, Authentication auth, 
AsyncHttpClient client, long readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public PackagesImpl(WebTarget webTarget, Authentication auth, 
AsyncHttpClient client, long requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         this.httpClient = client;
         this.packages = webTarget.path("/admin/v3/packages");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
index e98d9bf57b3..7ed07a1a6ad 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
@@ -32,8 +32,8 @@ public class ProxyStatsImpl extends BaseResource implements 
ProxyStats {
 
     private final WebTarget adminProxyStats;
 
-    public ProxyStatsImpl(WebTarget target, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public ProxyStatsImpl(WebTarget target, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminProxyStats = target.path("/proxy-stats");
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index 39347850cf6..e00caa6dbbc 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -159,29 +159,29 @@ public class PulsarAdminImpl implements PulsarAdmin {
                 Math.toIntExact(clientConfigData.getRequestTimeoutMs()),
                 clientConfigData.getAutoCertRefreshSeconds());
 
-        long readTimeoutMs = clientConfigData.getReadTimeoutMs();
-        this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
-        this.brokers = new BrokersImpl(root, auth, readTimeoutMs);
-        this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
-        this.proxyStats = new ProxyStatsImpl(root, auth, readTimeoutMs);
-        this.tenants = new TenantsImpl(root, auth, readTimeoutMs);
-        this.resourcegroups = new ResourceGroupsImpl(root, auth, 
readTimeoutMs);
-        this.properties = new TenantsImpl(root, auth, readTimeoutMs);
-        this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs);
-        this.topics = new TopicsImpl(root, auth, readTimeoutMs);
-        this.localTopicPolicies = new TopicPoliciesImpl(root, auth, 
readTimeoutMs, false);
-        this.globalTopicPolicies = new TopicPoliciesImpl(root, auth, 
readTimeoutMs, true);
-        this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, 
readTimeoutMs);
-        this.resourceQuotas = new ResourceQuotasImpl(root, auth, 
readTimeoutMs);
-        this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs, 
topics);
-        this.functions = new FunctionsImpl(root, auth, 
asyncHttpConnector.getHttpClient(), readTimeoutMs);
-        this.sources = new SourcesImpl(root, auth, 
asyncHttpConnector.getHttpClient(), readTimeoutMs);
-        this.sinks = new SinksImpl(root, auth, 
asyncHttpConnector.getHttpClient(), readTimeoutMs);
-        this.worker = new WorkerImpl(root, auth, readTimeoutMs);
-        this.schemas = new SchemasImpl(root, auth, readTimeoutMs);
-        this.bookies = new BookiesImpl(root, auth, readTimeoutMs);
-        this.packages = new PackagesImpl(root, auth, 
asyncHttpConnector.getHttpClient(), readTimeoutMs);
-        this.transactions = new TransactionsImpl(root, auth, readTimeoutMs);
+        long requestTimeoutMs = clientConfigData.getRequestTimeoutMs();
+        this.clusters = new ClustersImpl(root, auth, requestTimeoutMs);
+        this.brokers = new BrokersImpl(root, auth, requestTimeoutMs);
+        this.brokerStats = new BrokerStatsImpl(root, auth, requestTimeoutMs);
+        this.proxyStats = new ProxyStatsImpl(root, auth, requestTimeoutMs);
+        this.tenants = new TenantsImpl(root, auth, requestTimeoutMs);
+        this.resourcegroups = new ResourceGroupsImpl(root, auth, 
requestTimeoutMs);
+        this.properties = new TenantsImpl(root, auth, requestTimeoutMs);
+        this.namespaces = new NamespacesImpl(root, auth, requestTimeoutMs);
+        this.topics = new TopicsImpl(root, auth, requestTimeoutMs);
+        this.localTopicPolicies = new TopicPoliciesImpl(root, auth, 
requestTimeoutMs, false);
+        this.globalTopicPolicies = new TopicPoliciesImpl(root, auth, 
requestTimeoutMs, true);
+        this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, 
requestTimeoutMs);
+        this.resourceQuotas = new ResourceQuotasImpl(root, auth, 
requestTimeoutMs);
+        this.lookups = new LookupImpl(root, auth, useTls, requestTimeoutMs, 
topics);
+        this.functions = new FunctionsImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+        this.sources = new SourcesImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+        this.sinks = new SinksImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+        this.worker = new WorkerImpl(root, auth, requestTimeoutMs);
+        this.schemas = new SchemasImpl(root, auth, requestTimeoutMs);
+        this.bookies = new BookiesImpl(root, auth, requestTimeoutMs);
+        this.packages = new PackagesImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+        this.transactions = new TransactionsImpl(root, auth, requestTimeoutMs);
 
         if (originalCtxLoader != null) {
             Thread.currentThread().setContextClassLoader(originalCtxLoader);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
index a8cef60232f..4e7230eebd9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
@@ -32,8 +32,8 @@ import org.apache.pulsar.common.policies.data.ResourceGroup;
 public class ResourceGroupsImpl extends BaseResource implements ResourceGroups 
{
     private final WebTarget adminResourceGroups;
 
-    public ResourceGroupsImpl(WebTarget web, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public ResourceGroupsImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminResourceGroups = web.path("/admin/v2/resourcegroups");
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
index 1e80c9eda94..68884d99448 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
@@ -33,8 +33,8 @@ public class ResourceQuotasImpl extends BaseResource 
implements ResourceQuotas {
     private final WebTarget adminQuotas;
     private final WebTarget adminV2Quotas;
 
-    public ResourceQuotasImpl(WebTarget web, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public ResourceQuotasImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminQuotas = web.path("/admin/resource-quotas");
         adminV2Quotas = web.path("/admin/v2/resource-quotas");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 593eb67fc0d..28b435ab567 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -46,8 +46,8 @@ public class SchemasImpl extends BaseResource implements 
Schemas {
     private final WebTarget adminV2;
     private final WebTarget adminV1;
 
-    public SchemasImpl(WebTarget web, Authentication auth, long readTimeoutMs) 
{
-        super(auth, readTimeoutMs);
+    public SchemasImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         this.adminV1 = web.path("/admin/schemas");
         this.adminV2 = web.path("/admin/v2/schemas");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index c14f75ab367..a30f51264cc 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -53,8 +53,8 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
     private final WebTarget sink;
     private final AsyncHttpClient asyncHttpClient;
 
-    public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         this.sink = web.path("/admin/v3/sink");
         this.asyncHttpClient = asyncHttpClient;
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index 6e5b84c7f04..8821ed61ce5 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -52,8 +52,8 @@ public class SourcesImpl extends ComponentResource implements 
Sources, Source {
     private final WebTarget source;
     private final AsyncHttpClient asyncHttpClient;
 
-    public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         this.source = web.path("/admin/v3/source");
         this.asyncHttpClient = asyncHttpClient;
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index 9b70e39ec49..c12f3754b4a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -34,8 +34,8 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 public class TenantsImpl extends BaseResource implements Tenants, Properties {
     private final WebTarget adminTenants;
 
-    public TenantsImpl(WebTarget web, Authentication auth, long readTimeoutMs) 
{
-        super(auth, readTimeoutMs);
+    public TenantsImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminTenants = web.path("/admin/v2/tenants");
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index b7a8b876640..9c4a6eef753 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -137,8 +137,8 @@ public class TopicsImpl extends BaseResource implements 
Topics {
 
     public static final String PROPERTY_SHADOW_SOURCE_KEY = 
"PULSAR.SHADOW_SOURCE";
 
-    public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public TopicsImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminTopics = web.path("/admin");
         adminV2Topics = web.path("/admin/v2");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 460478787eb..a0b9dd234d9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -46,8 +46,8 @@ import 
org.apache.pulsar.common.stats.PositionInPendingAckStats;
 public class TransactionsImpl extends BaseResource implements Transactions {
     private final WebTarget adminV3Transactions;
 
-    public TransactionsImpl(WebTarget web, Authentication auth, long 
readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public TransactionsImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         adminV3Transactions = web.path("/admin/v3/transactions");
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 60b1226d581..12a691edb08 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -40,8 +40,8 @@ public class WorkerImpl extends BaseResource implements 
Worker {
     private final WebTarget workerStats;
     private final WebTarget worker;
 
-    public WorkerImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
-        super(auth, readTimeoutMs);
+    public WorkerImpl(WebTarget web, Authentication auth, long 
requestTimeoutMs) {
+        super(auth, requestTimeoutMs);
         this.worker = web.path("/admin/v2/worker");
         this.workerStats = web.path("/admin/v2/worker-stats");
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 9ad0ce5029c..a0569c391ad 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -35,7 +35,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.client.Client;
@@ -59,6 +58,7 @@ import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
@@ -74,11 +74,11 @@ import org.glassfish.jersey.client.spi.Connector;
  */
 @Slf4j
 public class AsyncHttpConnector implements Connector {
-    private static final TimeoutException READ_TIMEOUT_EXCEPTION =
-            FutureUtil.createTimeoutException("Read timeout", 
AsyncHttpConnector.class, "retryOrTimeout(...)");
+    private static final TimeoutException REQUEST_TIMEOUT_EXCEPTION =
+            FutureUtil.createTimeoutException("Request timeout", 
AsyncHttpConnector.class, "retryOrTimeout(...)");
     @Getter
     private final AsyncHttpClient httpClient;
-    private final Duration readTimeout;
+    private final Duration requestTimeout;
     private final int maxRetries;
     private final PulsarServiceNameResolver serviceNameResolver;
     private final ScheduledExecutorService delayer = 
Executors.newScheduledThreadPool(1,
@@ -185,7 +185,7 @@ public class AsyncHttpConnector implements Connector {
             
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
         }
         httpClient = new DefaultAsyncHttpClient(confBuilder.build());
-        this.readTimeout = Duration.ofMillis(readTimeoutMs);
+        this.requestTimeout = requestTimeoutMs > 0 ? 
Duration.ofMillis(requestTimeoutMs) : null;
         this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
     }
 
@@ -264,9 +264,10 @@ public class AsyncHttpConnector implements Connector {
     private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
         final CompletableFuture<Response> resultFuture = new 
CompletableFuture<>();
         retryOperation(resultFuture, () -> 
oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
-        CompletableFuture<Response> timeoutAfter = 
FutureUtil.createFutureWithTimeout(readTimeout, delayer,
-                () -> READ_TIMEOUT_EXCEPTION);
-        return resultFuture.applyToEither(timeoutAfter, Function.identity());
+        if (requestTimeout != null) {
+            FutureUtil.addTimeoutHandling(resultFuture, requestTimeout, 
delayer, () -> REQUEST_TIMEOUT_EXCEPTION);
+        }
+        return resultFuture;
     }
 
     private <T> void retryOperation(
@@ -285,11 +286,18 @@ public class AsyncHttpConnector implements Connector {
                                         new RetryException("Operation future 
was cancelled.", throwable));
                             } else {
                                 if (retries > 0) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Retrying operation. 
Remaining retries: {}", retries);
+                                    }
                                     retryOperation(
                                             resultFuture,
                                             operation,
                                             retries - 1);
                                 } else {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Number of retries has been 
exhausted. Failing the operation.",
+                                                throwable);
+                                    }
                                     resultFuture.completeExceptionally(
                                         new RetryException("Could not complete 
the operation. Number of retries "
                                             + "has been exhausted. Failed 
reason: " + throwable.getMessage(),
@@ -315,7 +323,7 @@ public class AsyncHttpConnector implements Connector {
         }
     }
 
-    private CompletableFuture<Response> oneShot(InetSocketAddress host, 
ClientRequest request) {
+    protected CompletableFuture<Response> oneShot(InetSocketAddress host, 
ClientRequest request) {
         ClientRequest currentRequest = new ClientRequest(request);
         URI newUri = replaceWithNew(host, currentRequest.getUri());
         currentRequest.setUri(newUri);
@@ -347,7 +355,16 @@ public class AsyncHttpConnector implements Connector {
             builder.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
         }
 
-        return builder.execute().toCompletableFuture();
+        ListenableFuture<Response> responseFuture = builder.execute();
+        CompletableFuture<Response> completableFuture = 
responseFuture.toCompletableFuture();
+        completableFuture.whenComplete((response, throwable) -> {
+            if (throwable != null && (throwable instanceof 
CancellationException
+                    || throwable instanceof TimeoutException)) {
+                // abort the request if the future is cancelled or timed out
+                responseFuture.abort(throwable);
+            }
+        });
+        return completableFuture;
     }
 
     @Override
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
new file mode 100644
index 00000000000..dd3fb40ae9a
--- /dev/null
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.stubbing.Scenario;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.Response;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
+import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
+import org.glassfish.jersey.internal.MapPropertiesDelegate;
+import org.glassfish.jersey.internal.PropertiesDelegate;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class AsyncHttpConnectorTest {
+    WireMockServer server;
+
+    @BeforeClass(alwaysRun = true)
+    void beforeClass() throws IOException {
+        server = new WireMockServer(WireMockConfiguration.wireMockConfig()
+                .port(0));
+        server.start();
+    }
+
+    @AfterClass(alwaysRun = true)
+    void afterClass() {
+        if (server != null) {
+            server.stop();
+        }
+    }
+
+    static class TestClientRequest extends ClientRequest {
+        public TestClientRequest(URI uri, ClientConfig clientConfig, 
PropertiesDelegate propertiesDelegate) {
+            super(uri, clientConfig, propertiesDelegate);
+        }
+    }
+
+    @Test
+    public void testShouldStopRetriesWhenTimeoutOccurs() throws IOException, 
ExecutionException, InterruptedException {
+        server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                .inScenario("once")
+                .whenScenarioStateIs(Scenario.STARTED)
+                .willSetStateTo("next")
+                .willReturn(aResponse()
+                        .withHeader("Content-Type", "application/json")
+                        .withBody("[\"test-cluster\"]")));
+
+        server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                        .inScenario("once")
+                        .whenScenarioStateIs("next")
+                        .willSetStateTo("retried")
+                .willReturn(aResponse().withStatus(500)));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://localhost:"; + server.port());
+
+        int requestTimeout = 500;
+
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
+        Executor delayedExecutor = runnable -> {
+            scheduledExecutor.schedule(runnable, requestTimeout, 
TimeUnit.MILLISECONDS);
+        };
+        @Cleanup
+        AsyncHttpConnector connector = new AsyncHttpConnector(5000, 
requestTimeout,
+                requestTimeout, 0, conf, false) {
+            @Override
+            protected CompletableFuture<Response> oneShot(InetSocketAddress 
host, ClientRequest request) {
+                // delay the response to simulate a timeout
+                return super.oneShot(host, request)
+                        .thenApplyAsync(response -> {
+                            return response;
+                        }, delayedExecutor);
+            }
+        };
+
+        JerseyClient jerseyClient = JerseyClientBuilder.createClient();
+        ClientConfig clientConfig = jerseyClient.getConfiguration();
+        PropertiesDelegate propertiesDelegate = new MapPropertiesDelegate();
+        URI requestUri = URI.create("http://localhost:"; + server.port() + 
"/admin/v2/clusters");
+        ClientRequest request = new TestClientRequest(requestUri, 
clientConfig, propertiesDelegate);
+        request.setMethod("GET");
+        CompletableFuture<ClientResponse> future = new CompletableFuture<>();
+        connector.apply(request, new AsyncConnectorCallback() {
+            @Override
+            public void response(ClientResponse response) {
+                future.complete(response);
+            }
+
+            @Override
+            public void failure(Throwable failure) {
+                future.completeExceptionally(failure);
+            }
+        });
+        Thread.sleep(2 * requestTimeout);
+        String scenarioState =
+                
server.getAllScenarios().getScenarios().stream().filter(scenario -> 
"once".equals(scenario.getName()))
+                        .findFirst().get().getState();
+        assertEquals(scenarioState, "next");
+        assertTrue(future.isCompletedExceptionally());
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-admin/src/test/resources/log4j2.xml 
b/pulsar-client-admin/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..9b57b450ffa
--- /dev/null
+++ b/pulsar-client-admin/src/test/resources/log4j2.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration xmlns="http://logging.apache.org/log4j/2.0/config";
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config 
https://logging.apache.org/log4j/2.0/log4j-core.xsd";>
+  <Appenders>
+    <!-- setting follow="true" is required for using ConsoleCaptor to validate 
log messages -->
+    <Console name="CONSOLE" target="SYSTEM_OUT" follow="true">
+      <PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+<!--    <Logger 
name="org.apache.pulsar.broker.service.persistent.PersistentTopic" 
level="DEBUG" additivity="false">-->
+<!--       <AppenderRef ref="CONSOLE" />-->
+<!--    </Logger>-->
+
+    <Root level="INFO">
+      <AppenderRef ref="CONSOLE"/>
+    </Root>
+    <Logger name="org.apache.pulsar.client.admin" level="DEBUG" />
+  </Loggers>
+</Configuration>
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index 07e8a8b5df6..8ff7f1c31ce 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -37,10 +37,10 @@ public abstract class CmdBase {
     private final Supplier<PulsarAdmin> adminSupplier;
 
     /**
-     * Default read timeout in milliseconds.
-     * Used if not found from configuration data in {@link #getReadTimeoutMs()}
+     * Default request timeout in milliseconds.
+     * Used if not found from configuration data in {@link 
#getRequestTimeoutMs()}
      */
-    private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000;
+    private static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000;
 
     public CmdBase(String cmdName, Supplier<PulsarAdmin> adminSupplier) {
         this.adminSupplier = adminSupplier;
@@ -56,17 +56,17 @@ public abstract class CmdBase {
         return adminSupplier.get();
     }
 
-    protected long getReadTimeoutMs() {
+    protected long getRequestTimeoutMs() {
         PulsarAdmin pulsarAdmin = getAdmin();
         if (pulsarAdmin instanceof PulsarAdminImpl) {
-            return ((PulsarAdminImpl) 
pulsarAdmin).getClientConfigData().getReadTimeoutMs();
+            return ((PulsarAdminImpl) 
pulsarAdmin).getClientConfigData().getRequestTimeoutMs();
         }
-        return DEFAULT_READ_TIMEOUT_MILLIS;
+        return DEFAULT_REQUEST_TIMEOUT_MILLIS;
     }
 
     protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws 
PulsarAdminException {
         try {
-            return executor.get().get(getReadTimeoutMs(), 
TimeUnit.MILLISECONDS);
+            return executor.get().get(getRequestTimeoutMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);


Reply via email to