abhilashmandaliya commented on a change in pull request #10028:
URL: https://github.com/apache/pulsar/pull/10028#discussion_r600183667
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -136,38 +136,44 @@ public PulsarClientImpl(ClientConfigurationData conf,
EventLoopGroup eventLoopGr
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
- if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup ==
null) {
- throw new
PulsarClientException.InvalidConfigurationException("Invalid client
configuration");
- }
- this.eventLoopGroup = eventLoopGroup;
- setAuth(conf);
- this.conf = conf;
- this.clientClock = conf.getClock();
- conf.getAuthentication().start();
- this.cnxPool = cnxPool;
- externalExecutorProvider = new
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
- internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-internal");
- if (conf.getServiceUrl().startsWith("http")) {
- lookup = new HttpLookupService(conf, eventLoopGroup);
- } else {
- lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(),
conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
- }
- timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1,
TimeUnit.MILLISECONDS);
- producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
- consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
- if (conf.isEnableTransaction()) {
- tcClient = new TransactionCoordinatorClientImpl(this);
- try {
- tcClient.start();
- } catch (Throwable e) {
- log.error("Start transactionCoordinatorClient error.", e);
- throw new PulsarClientException(e);
+ try {
+ if (conf == null || isBlank(conf.getServiceUrl()) ||
eventLoopGroup == null) {
+ throw new
PulsarClientException.InvalidConfigurationException("Invalid client
configuration");
+ }
+ this.eventLoopGroup = eventLoopGroup;
+ setAuth(conf);
+ this.conf = conf;
+ this.clientClock = conf.getClock();
+ conf.getAuthentication().start();
+ this.cnxPool = cnxPool;
+ externalExecutorProvider = new
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+ internalExecutorService = new
ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+ if (conf.getServiceUrl().startsWith("http")) {
+ lookup = new HttpLookupService(conf, eventLoopGroup);
+ } else {
+ lookup = new BinaryProtoLookupService(this,
conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
+ }
+ timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1,
TimeUnit.MILLISECONDS);
+ producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ if (conf.isEnableTransaction()) {
+ tcClient = new TransactionCoordinatorClientImpl(this);
+ try {
+ tcClient.start();
+ } catch (Throwable e) {
+ log.error("Start transactionCoordinatorClient error.", e);
+ throw new PulsarClientException(e);
+ }
}
- }
- memoryLimitController = new
MemoryLimitController(conf.getMemoryLimitBytes());
- state.set(State.Open);
+ memoryLimitController = new
MemoryLimitController(conf.getMemoryLimitBytes());
+ state.set(State.Open);
+ } catch (Throwable t) {
+ shutdown();
+ shutdownEventLoopGroup(eventLoopGroup);
Review comment:
this call is for the eventLoopGroup passed as a constructor argument. if
an exception occurs before we assign it to our class member, we will need to
close the parameter. once assigned, they will be the same and hence I have
added a check whether it's already shutdown or not
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]