This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f2532c8 fix #7437 (#7687)
f2532c8 is described below
commit f2532c858de72150453c1956753d71b1e6f4ab3a
Author: rushsky518 <[email protected]>
AuthorDate: Wed Jul 29 14:34:10 2020 +0800
fix #7437 (#7687)
---
.../org/apache/pulsar/client/admin/PulsarAdmin.java | 20 ++++++++------------
.../admin/internal/http/AsyncHttpConnector.java | 1 +
2 files changed, 9 insertions(+), 12 deletions(-)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 561e72f..c997cd8 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -47,13 +47,13 @@ import org.apache.pulsar.client.admin.internal.SourcesImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.WorkerImpl;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.asynchttpclient.AsyncHttpClient;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.JacksonFeature;
@@ -86,7 +86,7 @@ public class PulsarAdmin implements Closeable {
private final ResourceQuotas resourceQuotas;
private final ClientConfigurationData clientConfigData;
private final Client client;
- private final AsyncHttpClient httpAsyncClient;
+ private final AsyncHttpConnector asyncHttpConnector;
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
@@ -184,10 +184,10 @@ public class PulsarAdmin implements Closeable {
this.serviceUrl = serviceUrl;
root = client.target(serviceUrl);
- this.httpAsyncClient = asyncConnectorProvider.getConnector(
+ this.asyncHttpConnector = asyncConnectorProvider.getConnector(
Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)),
Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
-
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout))).getHttpClient();
+
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)));
long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
@@ -201,9 +201,9 @@ public class PulsarAdmin implements Closeable {
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth,
readTimeoutMs);
this.resourceQuotas = new ResourceQuotasImpl(root, auth,
readTimeoutMs);
this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs);
- this.functions = new FunctionsImpl(root, auth, httpAsyncClient,
readTimeoutMs);
- this.sources = new SourcesImpl(root, auth, httpAsyncClient,
readTimeoutMs);
- this.sinks = new SinksImpl(root, auth, httpAsyncClient, readTimeoutMs);
+ this.functions = new FunctionsImpl(root, auth,
asyncHttpConnector.getHttpClient(), readTimeoutMs);
+ this.sources = new SourcesImpl(root, auth,
asyncHttpConnector.getHttpClient(), readTimeoutMs);
+ this.sinks = new SinksImpl(root, auth,
asyncHttpConnector.getHttpClient(), readTimeoutMs);
this.worker = new WorkerImpl(root, auth, readTimeoutMs);
this.schemas = new SchemasImpl(root, auth, readTimeoutMs);
this.bookies = new BookiesImpl(root, auth, readTimeoutMs);
@@ -433,10 +433,6 @@ public class PulsarAdmin implements Closeable {
}
client.close();
- try {
- httpAsyncClient.close();
- } catch (IOException e) {
- LOG.error("Failed to close http async client", e);
- }
+ asyncHttpConnector.close();
}
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index c0dc467..70373fb 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -310,6 +310,7 @@ public class AsyncHttpConnector implements Connector {
public void close() {
try {
httpClient.close();
+ delayer.shutdownNow();
} catch (IOException e) {
log.warn("Failed to close http client", e);
}