MarvinCai commented on a change in pull request #10028:
URL: https://github.com/apache/pulsar/pull/10028#discussion_r600180102



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -669,18 +675,105 @@ public void close() throws PulsarClientException {
     @Override
     public void shutdown() throws PulsarClientException {
         try {
-            lookup.close();
-            cnxPool.close();
-            timer.stop();
-            externalExecutorProvider.shutdownNow();
-            internalExecutorService.shutdownNow();
-            conf.getAuthentication().close();
+            // We will throw the last thrown exception only, though logging 
all of them.
+            Throwable throwable = null;
+            if (lookup != null) {
+                try {
+                    lookup.close();
+                } catch (Throwable t) {
+                    log.warn("Failed to shutdown lookup", t);
+                    throwable = t;
+                }
+            }
+            try {
+                // Shutting down eventLoopGroup separately because in some 
cases, cnxPool might be using different
+                // eventLoopGroup.
+                shutdownEventLoopGroup(eventLoopGroup);

Review comment:
       If connectionPool and PulsarClient is using the same eventLoopGroup, 
this might cause call shutDown on it multiple times, not sure what will happen 
in this situation? Probably move it to after closing `cnxPool`?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -136,38 +136,44 @@ public PulsarClientImpl(ClientConfigurationData conf, 
EventLoopGroup eventLoopGr
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == 
null) {
-            throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
-        }
-        this.eventLoopGroup = eventLoopGroup;
-        setAuth(conf);
-        this.conf = conf;
-        this.clientClock = conf.getClock();
-        conf.getAuthentication().start();
-        this.cnxPool = cnxPool;
-        externalExecutorProvider = new 
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-        internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-internal");
-        if (conf.getServiceUrl().startsWith("http")) {
-            lookup = new HttpLookupService(conf, eventLoopGroup);
-        } else {
-            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), 
conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
-        }
-        timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
-        producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
-        consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
-        if (conf.isEnableTransaction()) {
-            tcClient = new TransactionCoordinatorClientImpl(this);
-            try {
-                tcClient.start();
-            } catch (Throwable e) {
-                log.error("Start transactionCoordinatorClient error.", e);
-                throw new PulsarClientException(e);
+        try {
+            if (conf == null || isBlank(conf.getServiceUrl()) || 
eventLoopGroup == null) {
+                throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
+            }
+            this.eventLoopGroup = eventLoopGroup;
+            setAuth(conf);
+            this.conf = conf;
+            this.clientClock = conf.getClock();
+            conf.getAuthentication().start();
+            this.cnxPool = cnxPool;
+            externalExecutorProvider = new 
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            internalExecutorService = new 
ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            if (conf.getServiceUrl().startsWith("http")) {
+                lookup = new HttpLookupService(conf, eventLoopGroup);
+            } else {
+                lookup = new BinaryProtoLookupService(this, 
conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
+            }
+            timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
+            producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+            consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+            if (conf.isEnableTransaction()) {
+                tcClient = new TransactionCoordinatorClientImpl(this);
+                try {
+                    tcClient.start();
+                } catch (Throwable e) {
+                    log.error("Start transactionCoordinatorClient error.", e);
+                    throw new PulsarClientException(e);
+                }
             }
-        }
 
-        memoryLimitController = new 
MemoryLimitController(conf.getMemoryLimitBytes());
-        state.set(State.Open);
+            memoryLimitController = new 
MemoryLimitController(conf.getMemoryLimitBytes());
+            state.set(State.Open);
+        } catch (Throwable t) {
+            shutdown();
+            shutdownEventLoopGroup(eventLoopGroup);

Review comment:
       Is this redundant?  We already call 
`shutdownEventLoopGroup(eventLoopGroup)` in `shutDown()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to