abhilashmandaliya commented on a change in pull request #10028:
URL: https://github.com/apache/pulsar/pull/10028#discussion_r600183667



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -136,38 +136,44 @@ public PulsarClientImpl(ClientConfigurationData conf, 
EventLoopGroup eventLoopGr
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == 
null) {
-            throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
-        }
-        this.eventLoopGroup = eventLoopGroup;
-        setAuth(conf);
-        this.conf = conf;
-        this.clientClock = conf.getClock();
-        conf.getAuthentication().start();
-        this.cnxPool = cnxPool;
-        externalExecutorProvider = new 
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-        internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-internal");
-        if (conf.getServiceUrl().startsWith("http")) {
-            lookup = new HttpLookupService(conf, eventLoopGroup);
-        } else {
-            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), 
conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
-        }
-        timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
-        producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
-        consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
-        if (conf.isEnableTransaction()) {
-            tcClient = new TransactionCoordinatorClientImpl(this);
-            try {
-                tcClient.start();
-            } catch (Throwable e) {
-                log.error("Start transactionCoordinatorClient error.", e);
-                throw new PulsarClientException(e);
+        try {
+            if (conf == null || isBlank(conf.getServiceUrl()) || 
eventLoopGroup == null) {
+                throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
+            }
+            this.eventLoopGroup = eventLoopGroup;
+            setAuth(conf);
+            this.conf = conf;
+            this.clientClock = conf.getClock();
+            conf.getAuthentication().start();
+            this.cnxPool = cnxPool;
+            externalExecutorProvider = new 
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            internalExecutorService = new 
ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            if (conf.getServiceUrl().startsWith("http")) {
+                lookup = new HttpLookupService(conf, eventLoopGroup);
+            } else {
+                lookup = new BinaryProtoLookupService(this, 
conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
+            }
+            timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
+            producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+            consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+            if (conf.isEnableTransaction()) {
+                tcClient = new TransactionCoordinatorClientImpl(this);
+                try {
+                    tcClient.start();
+                } catch (Throwable e) {
+                    log.error("Start transactionCoordinatorClient error.", e);
+                    throw new PulsarClientException(e);
+                }
             }
-        }
 
-        memoryLimitController = new 
MemoryLimitController(conf.getMemoryLimitBytes());
-        state.set(State.Open);
+            memoryLimitController = new 
MemoryLimitController(conf.getMemoryLimitBytes());
+            state.set(State.Open);
+        } catch (Throwable t) {
+            shutdown();
+            shutdownEventLoopGroup(eventLoopGroup);

Review comment:
       this call is for the eventLoopGroup passed as a constructor argument. if 
an exception occurs before we assign it to our class member, we will need to 
close the parameter. once assigned, they will be the same and hence I have 
added a check whether it's already shutdown or not




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to