MarvinCai commented on a change in pull request #10028:
URL: https://github.com/apache/pulsar/pull/10028#discussion_r600180102
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -669,18 +675,105 @@ public void close() throws PulsarClientException {
@Override
public void shutdown() throws PulsarClientException {
try {
- lookup.close();
- cnxPool.close();
- timer.stop();
- externalExecutorProvider.shutdownNow();
- internalExecutorService.shutdownNow();
- conf.getAuthentication().close();
+ // We will throw the last thrown exception only, though logging
all of them.
+ Throwable throwable = null;
+ if (lookup != null) {
+ try {
+ lookup.close();
+ } catch (Throwable t) {
+ log.warn("Failed to shutdown lookup", t);
+ throwable = t;
+ }
+ }
+ try {
+ // Shutting down eventLoopGroup separately because in some
cases, cnxPool might be using different
+ // eventLoopGroup.
+ shutdownEventLoopGroup(eventLoopGroup);
Review comment:
If connectionPool and PulsarClient is using the same eventLoopGroup,
this might cause call shutDown on it multiple times, not sure what will happen
in this situation? Probably move it to after closing `cnxPool`?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -136,38 +136,44 @@ public PulsarClientImpl(ClientConfigurationData conf,
EventLoopGroup eventLoopGr
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
- if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup ==
null) {
- throw new
PulsarClientException.InvalidConfigurationException("Invalid client
configuration");
- }
- this.eventLoopGroup = eventLoopGroup;
- setAuth(conf);
- this.conf = conf;
- this.clientClock = conf.getClock();
- conf.getAuthentication().start();
- this.cnxPool = cnxPool;
- externalExecutorProvider = new
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
- internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-internal");
- if (conf.getServiceUrl().startsWith("http")) {
- lookup = new HttpLookupService(conf, eventLoopGroup);
- } else {
- lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(),
conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
- }
- timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1,
TimeUnit.MILLISECONDS);
- producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
- consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
- if (conf.isEnableTransaction()) {
- tcClient = new TransactionCoordinatorClientImpl(this);
- try {
- tcClient.start();
- } catch (Throwable e) {
- log.error("Start transactionCoordinatorClient error.", e);
- throw new PulsarClientException(e);
+ try {
+ if (conf == null || isBlank(conf.getServiceUrl()) ||
eventLoopGroup == null) {
+ throw new
PulsarClientException.InvalidConfigurationException("Invalid client
configuration");
+ }
+ this.eventLoopGroup = eventLoopGroup;
+ setAuth(conf);
+ this.conf = conf;
+ this.clientClock = conf.getClock();
+ conf.getAuthentication().start();
+ this.cnxPool = cnxPool;
+ externalExecutorProvider = new
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+ internalExecutorService = new
ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+ if (conf.getServiceUrl().startsWith("http")) {
+ lookup = new HttpLookupService(conf, eventLoopGroup);
+ } else {
+ lookup = new BinaryProtoLookupService(this,
conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
+ }
+ timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1,
TimeUnit.MILLISECONDS);
+ producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ if (conf.isEnableTransaction()) {
+ tcClient = new TransactionCoordinatorClientImpl(this);
+ try {
+ tcClient.start();
+ } catch (Throwable e) {
+ log.error("Start transactionCoordinatorClient error.", e);
+ throw new PulsarClientException(e);
+ }
}
- }
- memoryLimitController = new
MemoryLimitController(conf.getMemoryLimitBytes());
- state.set(State.Open);
+ memoryLimitController = new
MemoryLimitController(conf.getMemoryLimitBytes());
+ state.set(State.Open);
+ } catch (Throwable t) {
+ shutdown();
+ shutdownEventLoopGroup(eventLoopGroup);
Review comment:
Is this redundant? We already call
`shutdownEventLoopGroup(eventLoopGroup)` in `shutDown()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]