This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3b01c96594a [fix][client] Fix timeout handling in Pulsar Admin client
(#23128)
3b01c96594a is described below
commit 3b01c96594ae1af215018b1e1df29e5416f240d9
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Aug 7 08:22:25 2024 +0300
[fix][client] Fix timeout handling in Pulsar Admin client (#23128)
---
.../pulsar/client/admin/internal/BaseResource.java | 8 +-
.../pulsar/client/admin/internal/BookiesImpl.java | 4 +-
.../client/admin/internal/BrokerStatsImpl.java | 4 +-
.../pulsar/client/admin/internal/BrokersImpl.java | 4 +-
.../pulsar/client/admin/internal/ClustersImpl.java | 4 +-
.../client/admin/internal/ComponentResource.java | 4 +-
.../client/admin/internal/FunctionsImpl.java | 4 +-
.../client/admin/internal/NamespacesImpl.java | 4 +-
.../admin/internal/NonPersistentTopicsImpl.java | 4 +-
.../pulsar/client/admin/internal/PackagesImpl.java | 4 +-
.../client/admin/internal/ProxyStatsImpl.java | 4 +-
.../client/admin/internal/PulsarAdminImpl.java | 46 +++----
.../client/admin/internal/ResourceGroupsImpl.java | 4 +-
.../client/admin/internal/ResourceQuotasImpl.java | 4 +-
.../pulsar/client/admin/internal/SchemasImpl.java | 4 +-
.../pulsar/client/admin/internal/SinksImpl.java | 4 +-
.../pulsar/client/admin/internal/SourcesImpl.java | 4 +-
.../pulsar/client/admin/internal/TenantsImpl.java | 4 +-
.../pulsar/client/admin/internal/TopicsImpl.java | 4 +-
.../client/admin/internal/TransactionsImpl.java | 4 +-
.../pulsar/client/admin/internal/WorkerImpl.java | 4 +-
.../admin/internal/http/AsyncHttpConnector.java | 37 ++++--
.../internal/http/AsyncHttpConnectorTest.java | 140 +++++++++++++++++++++
pulsar-client-admin/src/test/resources/log4j2.xml | 41 ++++++
.../java/org/apache/pulsar/admin/cli/CmdBase.java | 14 +--
25 files changed, 280 insertions(+), 82 deletions(-)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 22550666cb6..ea39053c2ce 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -62,11 +62,11 @@ public abstract class BaseResource {
private static final Logger log =
LoggerFactory.getLogger(BaseResource.class);
protected final Authentication auth;
- protected final long readTimeoutMs;
+ protected final long requestTimeoutMs;
- protected BaseResource(Authentication auth, long readTimeoutMs) {
+ protected BaseResource(Authentication auth, long requestTimeoutMs) {
this.auth = auth;
- this.readTimeoutMs = readTimeoutMs;
+ this.requestTimeoutMs = requestTimeoutMs;
}
public Builder request(final WebTarget target) throws PulsarAdminException
{
@@ -339,7 +339,7 @@ public abstract class BaseResource {
protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws
PulsarAdminException {
try {
- return executor.get().get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ return executor.get().get(this.requestTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
index 2286fb8c8a3..0bf92e02677 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
@@ -32,8 +32,8 @@ import
org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
public class BookiesImpl extends BaseResource implements Bookies {
private final WebTarget adminBookies;
- public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs)
{
- super(auth, readTimeoutMs);
+ public BookiesImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminBookies = web.path("/admin/v2/bookies");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
index e409d6f4492..6ddabe9837e 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
@@ -38,8 +38,8 @@ public class BrokerStatsImpl extends BaseResource implements
BrokerStats {
private final WebTarget adminBrokerStats;
private final WebTarget adminV2BrokerStats;
- public BrokerStatsImpl(WebTarget target, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public BrokerStatsImpl(WebTarget target, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminBrokerStats = target.path("/admin/broker-stats");
adminV2BrokerStats = target.path("/admin/v2/broker-stats");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 7b4ebb1778d..b82c3fd0f41 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -37,8 +37,8 @@ import org.apache.pulsar.common.util.Codec;
public class BrokersImpl extends BaseResource implements Brokers {
private final WebTarget adminBrokers;
- public BrokersImpl(WebTarget web, Authentication auth, long readTimeoutMs)
{
- super(auth, readTimeoutMs);
+ public BrokersImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminBrokers = web.path("admin/v2/brokers");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 231d4506d61..24048ea3c0a 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -47,8 +47,8 @@ public class ClustersImpl extends BaseResource implements
Clusters {
private final WebTarget adminClusters;
- public ClustersImpl(WebTarget web, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public ClustersImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminClusters = web.path("/admin/v2/clusters");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
index 8beecff3897..0301f0fc2ee 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
@@ -37,8 +37,8 @@ import org.asynchttpclient.RequestBuilder;
*/
public class ComponentResource extends BaseResource {
- protected ComponentResource(Authentication auth, long readTimeoutMs) {
- super(auth, readTimeoutMs);
+ protected ComponentResource(Authentication auth, long requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
}
public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder
requestBuilder) throws PulsarAdminException {
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index bb4cb0c1ef8..97c42e5c1a9 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -72,8 +72,8 @@ public class FunctionsImpl extends ComponentResource
implements Functions {
private final WebTarget functions;
private final AsyncHttpClient asyncHttpClient;
- public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient
asyncHttpClient, long readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient
asyncHttpClient, long requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
this.functions = web.path("/admin/v3/functions");
this.asyncHttpClient = asyncHttpClient;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index c7492a26ab3..7d41c7203d2 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -64,8 +64,8 @@ public class NamespacesImpl extends BaseResource implements
Namespaces {
private final WebTarget adminNamespaces;
private final WebTarget adminV2Namespaces;
- public NamespacesImpl(WebTarget web, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public NamespacesImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminNamespaces = web.path("/admin/namespaces");
adminV2Namespaces = web.path("/admin/v2/namespaces");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index 76727cd1e0f..e98d44fdc4a 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -38,8 +38,8 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
private final WebTarget adminNonPersistentTopics;
private final WebTarget adminV2NonPersistentTopics;
- public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminNonPersistentTopics = web.path("/admin");
adminV2NonPersistentTopics = web.path("/admin/v2");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index 694c2160b0f..d69bef448c1 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -57,8 +57,8 @@ public class PackagesImpl extends ComponentResource
implements Packages {
private final WebTarget packages;
private final AsyncHttpClient httpClient;
- public PackagesImpl(WebTarget webTarget, Authentication auth,
AsyncHttpClient client, long readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public PackagesImpl(WebTarget webTarget, Authentication auth,
AsyncHttpClient client, long requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
this.httpClient = client;
this.packages = webTarget.path("/admin/v3/packages");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
index e98d9bf57b3..7ed07a1a6ad 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java
@@ -32,8 +32,8 @@ public class ProxyStatsImpl extends BaseResource implements
ProxyStats {
private final WebTarget adminProxyStats;
- public ProxyStatsImpl(WebTarget target, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public ProxyStatsImpl(WebTarget target, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminProxyStats = target.path("/proxy-stats");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index 39347850cf6..e00caa6dbbc 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -159,29 +159,29 @@ public class PulsarAdminImpl implements PulsarAdmin {
Math.toIntExact(clientConfigData.getRequestTimeoutMs()),
clientConfigData.getAutoCertRefreshSeconds());
- long readTimeoutMs = clientConfigData.getReadTimeoutMs();
- this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
- this.brokers = new BrokersImpl(root, auth, readTimeoutMs);
- this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
- this.proxyStats = new ProxyStatsImpl(root, auth, readTimeoutMs);
- this.tenants = new TenantsImpl(root, auth, readTimeoutMs);
- this.resourcegroups = new ResourceGroupsImpl(root, auth,
readTimeoutMs);
- this.properties = new TenantsImpl(root, auth, readTimeoutMs);
- this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs);
- this.topics = new TopicsImpl(root, auth, readTimeoutMs);
- this.localTopicPolicies = new TopicPoliciesImpl(root, auth,
readTimeoutMs, false);
- this.globalTopicPolicies = new TopicPoliciesImpl(root, auth,
readTimeoutMs, true);
- this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth,
readTimeoutMs);
- this.resourceQuotas = new ResourceQuotasImpl(root, auth,
readTimeoutMs);
- this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs,
topics);
- this.functions = new FunctionsImpl(root, auth,
asyncHttpConnector.getHttpClient(), readTimeoutMs);
- this.sources = new SourcesImpl(root, auth,
asyncHttpConnector.getHttpClient(), readTimeoutMs);
- this.sinks = new SinksImpl(root, auth,
asyncHttpConnector.getHttpClient(), readTimeoutMs);
- this.worker = new WorkerImpl(root, auth, readTimeoutMs);
- this.schemas = new SchemasImpl(root, auth, readTimeoutMs);
- this.bookies = new BookiesImpl(root, auth, readTimeoutMs);
- this.packages = new PackagesImpl(root, auth,
asyncHttpConnector.getHttpClient(), readTimeoutMs);
- this.transactions = new TransactionsImpl(root, auth, readTimeoutMs);
+ long requestTimeoutMs = clientConfigData.getRequestTimeoutMs();
+ this.clusters = new ClustersImpl(root, auth, requestTimeoutMs);
+ this.brokers = new BrokersImpl(root, auth, requestTimeoutMs);
+ this.brokerStats = new BrokerStatsImpl(root, auth, requestTimeoutMs);
+ this.proxyStats = new ProxyStatsImpl(root, auth, requestTimeoutMs);
+ this.tenants = new TenantsImpl(root, auth, requestTimeoutMs);
+ this.resourcegroups = new ResourceGroupsImpl(root, auth,
requestTimeoutMs);
+ this.properties = new TenantsImpl(root, auth, requestTimeoutMs);
+ this.namespaces = new NamespacesImpl(root, auth, requestTimeoutMs);
+ this.topics = new TopicsImpl(root, auth, requestTimeoutMs);
+ this.localTopicPolicies = new TopicPoliciesImpl(root, auth,
requestTimeoutMs, false);
+ this.globalTopicPolicies = new TopicPoliciesImpl(root, auth,
requestTimeoutMs, true);
+ this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth,
requestTimeoutMs);
+ this.resourceQuotas = new ResourceQuotasImpl(root, auth,
requestTimeoutMs);
+ this.lookups = new LookupImpl(root, auth, useTls, requestTimeoutMs,
topics);
+ this.functions = new FunctionsImpl(root, auth,
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+ this.sources = new SourcesImpl(root, auth,
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+ this.sinks = new SinksImpl(root, auth,
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+ this.worker = new WorkerImpl(root, auth, requestTimeoutMs);
+ this.schemas = new SchemasImpl(root, auth, requestTimeoutMs);
+ this.bookies = new BookiesImpl(root, auth, requestTimeoutMs);
+ this.packages = new PackagesImpl(root, auth,
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+ this.transactions = new TransactionsImpl(root, auth, requestTimeoutMs);
if (originalCtxLoader != null) {
Thread.currentThread().setContextClassLoader(originalCtxLoader);
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
index a8cef60232f..4e7230eebd9 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
@@ -32,8 +32,8 @@ import org.apache.pulsar.common.policies.data.ResourceGroup;
public class ResourceGroupsImpl extends BaseResource implements ResourceGroups
{
private final WebTarget adminResourceGroups;
- public ResourceGroupsImpl(WebTarget web, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public ResourceGroupsImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminResourceGroups = web.path("/admin/v2/resourcegroups");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
index 1e80c9eda94..68884d99448 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
@@ -33,8 +33,8 @@ public class ResourceQuotasImpl extends BaseResource
implements ResourceQuotas {
private final WebTarget adminQuotas;
private final WebTarget adminV2Quotas;
- public ResourceQuotasImpl(WebTarget web, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public ResourceQuotasImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminQuotas = web.path("/admin/resource-quotas");
adminV2Quotas = web.path("/admin/v2/resource-quotas");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 593eb67fc0d..28b435ab567 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -46,8 +46,8 @@ public class SchemasImpl extends BaseResource implements
Schemas {
private final WebTarget adminV2;
private final WebTarget adminV1;
- public SchemasImpl(WebTarget web, Authentication auth, long readTimeoutMs)
{
- super(auth, readTimeoutMs);
+ public SchemasImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
this.adminV1 = web.path("/admin/schemas");
this.adminV2 = web.path("/admin/v2/schemas");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index c14f75ab367..a30f51264cc 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -53,8 +53,8 @@ public class SinksImpl extends ComponentResource implements
Sinks, Sink {
private final WebTarget sink;
private final AsyncHttpClient asyncHttpClient;
- public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient
asyncHttpClient, long readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient
asyncHttpClient, long requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
this.sink = web.path("/admin/v3/sink");
this.asyncHttpClient = asyncHttpClient;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index 6e5b84c7f04..8821ed61ce5 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -52,8 +52,8 @@ public class SourcesImpl extends ComponentResource implements
Sources, Source {
private final WebTarget source;
private final AsyncHttpClient asyncHttpClient;
- public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient
asyncHttpClient, long readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient
asyncHttpClient, long requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
this.source = web.path("/admin/v3/source");
this.asyncHttpClient = asyncHttpClient;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index 9b70e39ec49..c12f3754b4a 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -34,8 +34,8 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
public class TenantsImpl extends BaseResource implements Tenants, Properties {
private final WebTarget adminTenants;
- public TenantsImpl(WebTarget web, Authentication auth, long readTimeoutMs)
{
- super(auth, readTimeoutMs);
+ public TenantsImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminTenants = web.path("/admin/v2/tenants");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index b7a8b876640..9c4a6eef753 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -137,8 +137,8 @@ public class TopicsImpl extends BaseResource implements
Topics {
public static final String PROPERTY_SHADOW_SOURCE_KEY =
"PULSAR.SHADOW_SOURCE";
- public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public TopicsImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminTopics = web.path("/admin");
adminV2Topics = web.path("/admin/v2");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 460478787eb..a0b9dd234d9 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -46,8 +46,8 @@ import
org.apache.pulsar.common.stats.PositionInPendingAckStats;
public class TransactionsImpl extends BaseResource implements Transactions {
private final WebTarget adminV3Transactions;
- public TransactionsImpl(WebTarget web, Authentication auth, long
readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public TransactionsImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
adminV3Transactions = web.path("/admin/v3/transactions");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 60b1226d581..12a691edb08 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -40,8 +40,8 @@ public class WorkerImpl extends BaseResource implements
Worker {
private final WebTarget workerStats;
private final WebTarget worker;
- public WorkerImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
- super(auth, readTimeoutMs);
+ public WorkerImpl(WebTarget web, Authentication auth, long
requestTimeoutMs) {
+ super(auth, requestTimeoutMs);
this.worker = web.path("/admin/v2/worker");
this.workerStats = web.path("/admin/v2/worker-stats");
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 9ad0ce5029c..a0569c391ad 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -35,7 +35,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
@@ -59,6 +58,7 @@ import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
@@ -74,11 +74,11 @@ import org.glassfish.jersey.client.spi.Connector;
*/
@Slf4j
public class AsyncHttpConnector implements Connector {
- private static final TimeoutException READ_TIMEOUT_EXCEPTION =
- FutureUtil.createTimeoutException("Read timeout",
AsyncHttpConnector.class, "retryOrTimeout(...)");
+ private static final TimeoutException REQUEST_TIMEOUT_EXCEPTION =
+ FutureUtil.createTimeoutException("Request timeout",
AsyncHttpConnector.class, "retryOrTimeout(...)");
@Getter
private final AsyncHttpClient httpClient;
- private final Duration readTimeout;
+ private final Duration requestTimeout;
private final int maxRetries;
private final PulsarServiceNameResolver serviceNameResolver;
private final ScheduledExecutorService delayer =
Executors.newScheduledThreadPool(1,
@@ -185,7 +185,7 @@ public class AsyncHttpConnector implements Connector {
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
- this.readTimeout = Duration.ofMillis(readTimeoutMs);
+ this.requestTimeout = requestTimeoutMs > 0 ?
Duration.ofMillis(requestTimeoutMs) : null;
this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
}
@@ -264,9 +264,10 @@ public class AsyncHttpConnector implements Connector {
private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
final CompletableFuture<Response> resultFuture = new
CompletableFuture<>();
retryOperation(resultFuture, () ->
oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
- CompletableFuture<Response> timeoutAfter =
FutureUtil.createFutureWithTimeout(readTimeout, delayer,
- () -> READ_TIMEOUT_EXCEPTION);
- return resultFuture.applyToEither(timeoutAfter, Function.identity());
+ if (requestTimeout != null) {
+ FutureUtil.addTimeoutHandling(resultFuture, requestTimeout,
delayer, () -> REQUEST_TIMEOUT_EXCEPTION);
+ }
+ return resultFuture;
}
private <T> void retryOperation(
@@ -285,11 +286,18 @@ public class AsyncHttpConnector implements Connector {
new RetryException("Operation future
was cancelled.", throwable));
} else {
if (retries > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Retrying operation.
Remaining retries: {}", retries);
+ }
retryOperation(
resultFuture,
operation,
retries - 1);
} else {
+ if (log.isDebugEnabled()) {
+ log.debug("Number of retries has been
exhausted. Failing the operation.",
+ throwable);
+ }
resultFuture.completeExceptionally(
new RetryException("Could not complete
the operation. Number of retries "
+ "has been exhausted. Failed
reason: " + throwable.getMessage(),
@@ -315,7 +323,7 @@ public class AsyncHttpConnector implements Connector {
}
}
- private CompletableFuture<Response> oneShot(InetSocketAddress host,
ClientRequest request) {
+ protected CompletableFuture<Response> oneShot(InetSocketAddress host,
ClientRequest request) {
ClientRequest currentRequest = new ClientRequest(request);
URI newUri = replaceWithNew(host, currentRequest.getUri());
currentRequest.setUri(newUri);
@@ -347,7 +355,16 @@ public class AsyncHttpConnector implements Connector {
builder.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
}
- return builder.execute().toCompletableFuture();
+ ListenableFuture<Response> responseFuture = builder.execute();
+ CompletableFuture<Response> completableFuture =
responseFuture.toCompletableFuture();
+ completableFuture.whenComplete((response, throwable) -> {
+ if (throwable != null && (throwable instanceof
CancellationException
+ || throwable instanceof TimeoutException)) {
+ // abort the request if the future is cancelled or timed out
+ responseFuture.abort(throwable);
+ }
+ });
+ return completableFuture;
}
@Override
diff --git
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
new file mode 100644
index 00000000000..dd3fb40ae9a
--- /dev/null
+++
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.stubbing.Scenario;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.Response;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
+import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
+import org.glassfish.jersey.internal.MapPropertiesDelegate;
+import org.glassfish.jersey.internal.PropertiesDelegate;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class AsyncHttpConnectorTest {
+ WireMockServer server;
+
+ @BeforeClass(alwaysRun = true)
+ void beforeClass() throws IOException {
+ server = new WireMockServer(WireMockConfiguration.wireMockConfig()
+ .port(0));
+ server.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ static class TestClientRequest extends ClientRequest {
+ public TestClientRequest(URI uri, ClientConfig clientConfig,
PropertiesDelegate propertiesDelegate) {
+ super(uri, clientConfig, propertiesDelegate);
+ }
+ }
+
+ @Test
+ public void testShouldStopRetriesWhenTimeoutOccurs() throws IOException,
ExecutionException, InterruptedException {
+ server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .inScenario("once")
+ .whenScenarioStateIs(Scenario.STARTED)
+ .willSetStateTo("next")
+ .willReturn(aResponse()
+ .withHeader("Content-Type", "application/json")
+ .withBody("[\"test-cluster\"]")));
+
+ server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .inScenario("once")
+ .whenScenarioStateIs("next")
+ .willSetStateTo("retried")
+ .willReturn(aResponse().withStatus(500)));
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("http://localhost:" + server.port());
+
+ int requestTimeout = 500;
+
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+ Executor delayedExecutor = runnable -> {
+ scheduledExecutor.schedule(runnable, requestTimeout,
TimeUnit.MILLISECONDS);
+ };
+ @Cleanup
+ AsyncHttpConnector connector = new AsyncHttpConnector(5000,
requestTimeout,
+ requestTimeout, 0, conf, false) {
+ @Override
+ protected CompletableFuture<Response> oneShot(InetSocketAddress
host, ClientRequest request) {
+ // delay the response to simulate a timeout
+ return super.oneShot(host, request)
+ .thenApplyAsync(response -> {
+ return response;
+ }, delayedExecutor);
+ }
+ };
+
+ JerseyClient jerseyClient = JerseyClientBuilder.createClient();
+ ClientConfig clientConfig = jerseyClient.getConfiguration();
+ PropertiesDelegate propertiesDelegate = new MapPropertiesDelegate();
+ URI requestUri = URI.create("http://localhost:" + server.port() +
"/admin/v2/clusters");
+ ClientRequest request = new TestClientRequest(requestUri,
clientConfig, propertiesDelegate);
+ request.setMethod("GET");
+ CompletableFuture<ClientResponse> future = new CompletableFuture<>();
+ connector.apply(request, new AsyncConnectorCallback() {
+ @Override
+ public void response(ClientResponse response) {
+ future.complete(response);
+ }
+
+ @Override
+ public void failure(Throwable failure) {
+ future.completeExceptionally(failure);
+ }
+ });
+ Thread.sleep(2 * requestTimeout);
+ String scenarioState =
+
server.getAllScenarios().getScenarios().stream().filter(scenario ->
"once".equals(scenario.getName()))
+ .findFirst().get().getState();
+ assertEquals(scenarioState, "next");
+ assertTrue(future.isCompletedExceptionally());
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-admin/src/test/resources/log4j2.xml
b/pulsar-client-admin/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..9b57b450ffa
--- /dev/null
+++ b/pulsar-client-admin/src/test/resources/log4j2.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config
https://logging.apache.org/log4j/2.0/log4j-core.xsd">
+ <Appenders>
+ <!-- setting follow="true" is required for using ConsoleCaptor to validate
log messages -->
+ <Console name="CONSOLE" target="SYSTEM_OUT" follow="true">
+ <PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+<!-- <Logger
name="org.apache.pulsar.broker.service.persistent.PersistentTopic"
level="DEBUG" additivity="false">-->
+<!-- <AppenderRef ref="CONSOLE" />-->
+<!-- </Logger>-->
+
+ <Root level="INFO">
+ <AppenderRef ref="CONSOLE"/>
+ </Root>
+ <Logger name="org.apache.pulsar.client.admin" level="DEBUG" />
+ </Loggers>
+</Configuration>
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index 07e8a8b5df6..8ff7f1c31ce 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -37,10 +37,10 @@ public abstract class CmdBase {
private final Supplier<PulsarAdmin> adminSupplier;
/**
- * Default read timeout in milliseconds.
- * Used if not found from configuration data in {@link #getReadTimeoutMs()}
+ * Default request timeout in milliseconds.
+ * Used if not found from configuration data in {@link
#getRequestTimeoutMs()}
*/
- private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000;
+ private static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000;
public CmdBase(String cmdName, Supplier<PulsarAdmin> adminSupplier) {
this.adminSupplier = adminSupplier;
@@ -56,17 +56,17 @@ public abstract class CmdBase {
return adminSupplier.get();
}
- protected long getReadTimeoutMs() {
+ protected long getRequestTimeoutMs() {
PulsarAdmin pulsarAdmin = getAdmin();
if (pulsarAdmin instanceof PulsarAdminImpl) {
- return ((PulsarAdminImpl)
pulsarAdmin).getClientConfigData().getReadTimeoutMs();
+ return ((PulsarAdminImpl)
pulsarAdmin).getClientConfigData().getRequestTimeoutMs();
}
- return DEFAULT_READ_TIMEOUT_MILLIS;
+ return DEFAULT_REQUEST_TIMEOUT_MILLIS;
}
protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws
PulsarAdminException {
try {
- return executor.get().get(getReadTimeoutMs(),
TimeUnit.MILLISECONDS);
+ return executor.get().get(getRequestTimeoutMs(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);