This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f2532c8  fix #7437 (#7687)
f2532c8 is described below

commit f2532c858de72150453c1956753d71b1e6f4ab3a
Author: rushsky518 <[email protected]>
AuthorDate: Wed Jul 29 14:34:10 2020 +0800

    fix #7437 (#7687)
---
 .../org/apache/pulsar/client/admin/PulsarAdmin.java  | 20 ++++++++------------
 .../admin/internal/http/AsyncHttpConnector.java      |  1 +
 2 files changed, 9 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 561e72f..c997cd8 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -47,13 +47,13 @@ import org.apache.pulsar.client.admin.internal.SourcesImpl;
 import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.admin.internal.WorkerImpl;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
 import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.asynchttpclient.AsyncHttpClient;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.jackson.JacksonFeature;
@@ -86,7 +86,7 @@ public class PulsarAdmin implements Closeable {
     private final ResourceQuotas resourceQuotas;
     private final ClientConfigurationData clientConfigData;
     private final Client client;
-    private final AsyncHttpClient httpAsyncClient;
+    private final AsyncHttpConnector asyncHttpConnector;
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
@@ -184,10 +184,10 @@ public class PulsarAdmin implements Closeable {
         this.serviceUrl = serviceUrl;
         root = client.target(serviceUrl);
 
-        this.httpAsyncClient = asyncConnectorProvider.getConnector(
+        this.asyncHttpConnector = asyncConnectorProvider.getConnector(
                 
Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)),
                 Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
-                
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout))).getHttpClient();
+                
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)));
 
         long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
         this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
@@ -201,9 +201,9 @@ public class PulsarAdmin implements Closeable {
         this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, 
readTimeoutMs);
         this.resourceQuotas = new ResourceQuotasImpl(root, auth, 
readTimeoutMs);
         this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs);
-        this.functions = new FunctionsImpl(root, auth, httpAsyncClient, 
readTimeoutMs);
-        this.sources = new SourcesImpl(root, auth, httpAsyncClient, 
readTimeoutMs);
-        this.sinks = new SinksImpl(root, auth, httpAsyncClient, readTimeoutMs);
+        this.functions = new FunctionsImpl(root, auth, 
asyncHttpConnector.getHttpClient(), readTimeoutMs);
+        this.sources = new SourcesImpl(root, auth, 
asyncHttpConnector.getHttpClient(), readTimeoutMs);
+        this.sinks = new SinksImpl(root, auth, 
asyncHttpConnector.getHttpClient(), readTimeoutMs);
         this.worker = new WorkerImpl(root, auth, readTimeoutMs);
         this.schemas = new SchemasImpl(root, auth, readTimeoutMs);
         this.bookies = new BookiesImpl(root, auth, readTimeoutMs);
@@ -433,10 +433,6 @@ public class PulsarAdmin implements Closeable {
         }
         client.close();
 
-        try {
-            httpAsyncClient.close();
-        } catch (IOException e) {
-           LOG.error("Failed to close http async client", e);
-        }
+        asyncHttpConnector.close();
     }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index c0dc467..70373fb 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -310,6 +310,7 @@ public class AsyncHttpConnector implements Connector {
     public void close() {
         try {
             httpClient.close();
+            delayer.shutdownNow();
         } catch (IOException e) {
             log.warn("Failed to close http client", e);
         }

Reply via email to