This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new cf06952b9b9 [improve][broker] Add UncaughtExceptionHandler for every
thread pool (#18211)
cf06952b9b9 is described below
commit cf06952b9b9994615fc9f1a7031606d83e1d5d85
Author: feynmanlin <[email protected]>
AuthorDate: Thu Oct 27 11:29:53 2022 +0800
[improve][broker] Add UncaughtExceptionHandler for every thread pool
(#18211)
(cherry picked from commit 5b7c5c62965151c35d9e5b9f0b50bb93b0beb2c3)
---
.../org/apache/pulsar/broker/PulsarService.java | 11 ++++---
.../broker/TransactionMetadataStoreService.java | 4 +--
.../loadbalance/impl/ModularLoadManagerImpl.java | 7 +++--
.../loadbalance/impl/SimpleLoadManagerImpl.java | 4 +--
.../pulsar/broker/service/BrokerService.java | 36 ++++++++++++----------
.../metrics/PrometheusMetricsProvider.java | 5 +--
.../pulsar/broker/tools/LoadReportCommand.java | 4 ++-
.../pulsar/client/impl/AutoClusterFailover.java | 4 +--
.../client/impl/ControlledClusterFailover.java | 4 +--
.../pulsar/client/impl/PulsarClientImpl.java | 3 +-
.../pulsar/client/util/ExecutorProvider.java | 11 ++++---
.../pulsar/functions/instance/InstanceCache.java | 3 +-
.../worker/ClusterServiceCoordinator.java | 15 +++++++++
13 files changed, 68 insertions(+), 43 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5c239e73bf5..2bda5c55398 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -310,13 +310,13 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.config = config;
this.processTerminator = processTerminator;
this.loadManagerExecutor = Executors
- .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-load-manager"));
+ .newSingleThreadScheduledExecutor(new
ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
this.workerConfig = workerConfig;
this.functionWorkerService = functionWorkerService;
this.executor =
Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(),
- new DefaultThreadFactory("pulsar"));
+ new ExecutorProvider.ExtendedThreadFactory("pulsar"));
this.cacheExecutor =
Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(),
- new DefaultThreadFactory("zk-cache-callback"));
+ new
ExecutorProvider.ExtendedThreadFactory("zk-cache-callback"));
if (config.isTransactionCoordinatorEnabled()) {
this.transactionExecutorProvider = new
ExecutorProvider(this.getConfiguration()
@@ -594,7 +594,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void>
future) {
ScheduledExecutorService shutdownExecutor =
Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory(getClass().getSimpleName() +
"-shutdown"));
+ new
ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() +
"-shutdown"));
FutureUtil.addTimeoutHandling(future,
Duration.ofMillis(Math.max(1L,
getConfiguration().getBrokerShutdownTimeoutMs())),
shutdownExecutor, () ->
FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
@@ -1337,7 +1337,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
protected synchronized ScheduledExecutorService getCompactorExecutor() {
if (this.compactorExecutor == null) {
- compactorExecutor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("compaction"));
+ compactorExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("compaction"));
}
return this.compactorExecutor;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index ca33f1cd3d1..8a3c47d94ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -24,7 +24,6 @@ import static
org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
@@ -53,6 +52,7 @@ import
org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -95,7 +95,7 @@ public class TransactionMetadataStoreService {
private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
private final ThreadFactory threadFactory =
- new DefaultThreadFactory("transaction-coordinator-thread-factory");
+ new
ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
public TransactionMetadataStoreService(TransactionMetadataStoreProvider
transactionMetadataStoreProvider,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 10b6cb86534..1beb8ec7415 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@@ -66,6 +65,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -215,8 +215,9 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
preallocatedBundleToBroker = new ConcurrentHashMap<>();
- scheduler = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-modular-load-manager"));
- this.brokerToFailureDomainMap = Maps.newHashMap();
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
+ this.brokerToFailureDomainMap = new HashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 06c87cba627..c21d939e017 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
@@ -55,6 +54,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -189,7 +189,7 @@ public class SimpleLoadManagerImpl implements LoadManager,
Consumer<Notification
// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-simple-load-manager"));
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-simple-load-manager"));
this.sortedRankings.set(new TreeMap<>());
this.currentLoadReports = new HashMap<>();
this.resourceUnitRankings = new HashMap<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ecc1312ac8f..b8ae31267f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -130,6 +130,7 @@ import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
import org.apache.pulsar.common.configuration.FieldContext;
@@ -311,13 +312,14 @@ public class BrokerService implements Closeable {
this.topicOrderedExecutor = OrderedExecutor.newBuilder()
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
.name("broker-topic-workers").build();
- final DefaultThreadFactory acceptorThreadFactory = new
DefaultThreadFactory("pulsar-acceptor");
+ final DefaultThreadFactory acceptorThreadFactory =
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-acceptor");
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), false,
acceptorThreadFactory);
this.workerGroup = eventLoopGroup;
- this.statsUpdater = Executors
- .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-stats-updater"));
+ this.statsUpdater = Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
@@ -327,19 +329,18 @@ public class BrokerService implements Closeable {
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
- this.inactivityMonitor = Executors
- .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-inactivity-monitor"));
- this.messageExpiryMonitor = Executors
- .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-msg-expiry-monitor"));
+ this.inactivityMonitor = Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-inactivity-monitor"));
+ this.messageExpiryMonitor = Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-msg-expiry-monitor"));
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-compaction-monitor"));
- this.consumedLedgersMonitor = Executors
- .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("consumed-Ledgers-monitor"));
-
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-compaction-monitor"));
+ this.consumedLedgersMonitor =
Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("consumed-Ledgers-monitor"));
this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
- this.backlogQuotaChecker = Executors
- .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-backlog-quota-checker"));
+ this.backlogQuotaChecker = Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new
AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
@@ -424,7 +425,8 @@ public class BrokerService implements Closeable {
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024
* 1024));
EventLoopUtil.enableTriggeredMode(bootstrap);
- DefaultThreadFactory defaultThreadFactory = new
DefaultThreadFactory("pulsar-ph-" + protocol);
+ DefaultThreadFactory defaultThreadFactory =
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-ph-" +
protocol);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false,
defaultThreadFactory);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
@@ -528,7 +530,7 @@ public class BrokerService implements Closeable {
int interval =
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
if (interval > 0 &&
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
this.deduplicationSnapshotMonitor =
- Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory(
+ Executors.newSingleThreadScheduledExecutor(new
ExecutorProvider.ExtendedThreadFactory(
"deduplication-snapshot-monitor"));
deduplicationSnapshotMonitor.scheduleAtFixedRate(safeRun(() ->
forEachTopic(
Topic::checkDeduplicationSnapshot))
@@ -613,7 +615,7 @@ public class BrokerService implements Closeable {
if (topicTickTimeMs > 0) {
if (this.topicPublishRateLimiterMonitor == null) {
this.topicPublishRateLimiterMonitor =
Executors.newSingleThreadScheduledExecutor(
- new
DefaultThreadFactory("pulsar-topic-publish-rate-limiter-monitor"));
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-topic-publish-rate-limiter-monitor"));
// schedule task that sums up publish-rate across all cnx on a
topic
topicPublishRateLimiterMonitor.scheduleAtFixedRate(safeRun(()
-> checkTopicPublishThrottlingRate()),
topicTickTimeMs, topicTickTimeMs,
TimeUnit.MILLISECONDS);
@@ -646,7 +648,7 @@ public class BrokerService implements Closeable {
if (brokerTickTimeMs > 0) {
if (this.brokerPublishRateLimiterMonitor == null) {
this.brokerPublishRateLimiterMonitor =
Executors.newSingleThreadScheduledExecutor(
- new
DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
// schedule task that sums up publish-rate across all cnx on a
topic,
// and check the rate limit exceeded or not.
brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
index 0e59286861a..18625a62a23 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.stats.prometheus.metrics;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Collector;
import java.io.IOException;
import java.io.Writer;
@@ -34,6 +33,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.util.ExecutorProvider;
/**
* A <i>Prometheus</i> based {@link StatsProvider} implementation.
@@ -90,7 +90,8 @@ public class PrometheusMetricsProvider implements
StatsProvider {
@Override
public void start(Configuration conf) {
- executor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("metrics"));
+ executor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("metrics"));
int latencyRolloverSeconds =
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
index 34340c3b0c5..3a77a8dccef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.tools.LoadReportCommand.Flags;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -88,7 +89,8 @@ public class LoadReportCommand extends CliCommand<CliFlags,
Flags> {
spec.console().println("--------------------------------------");
spec.console().println();
- ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("load-report"));
BrokerHostUsage hostUsage;
try {
if (isLinux) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index deaff573f79..6bc1d9a3adf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Strings;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -38,6 +37,7 @@ import
org.apache.pulsar.client.api.AutoClusterFailoverBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
@Slf4j
@Data
@@ -80,7 +80,7 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
this.intervalMs = builder.checkIntervalMs;
this.resolver = new PulsarServiceNameResolver();
this.executor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-service-provider"));
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index 1a415200937..632563a0021 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
@@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
@@ -74,7 +74,7 @@ public class ControlledClusterFailover implements
ServiceUrlProvider {
this.currentPulsarServiceUrl = builder.defaultServiceUrl;
this.interval = builder.interval;
this.executor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-service-provider"));
+ new
ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
this.httpClient = buildHttpClient();
this.requestBuilder = httpClient.prepareGet(builder.urlProvider)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 4d44ea90350..294e73cc673 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -26,7 +26,6 @@ import com.google.common.cache.LoadingCache;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Clock;
@@ -1044,7 +1043,7 @@ public class PulsarClientImpl implements PulsarClient {
}
private static ThreadFactory getThreadFactory(String poolName) {
- return new DefaultThreadFactory(poolName,
Thread.currentThread().isDaemon());
+ return new ExecutorProvider.ExtendedThreadFactory(poolName,
Thread.currentThread().isDaemon());
}
void cleanupProducer(ProducerBase<?> producer) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 15b76d3dfb5..8997d714194 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -40,18 +40,21 @@ public class ExecutorProvider {
private final String poolName;
private volatile boolean isShutdown;
- protected static class ExtendedThreadFactory extends DefaultThreadFactory {
-
+ public static class ExtendedThreadFactory extends DefaultThreadFactory {
@Getter
private volatile Thread thread;
+ public ExtendedThreadFactory(String poolName) {
+ super(poolName, false);
+ }
public ExtendedThreadFactory(String poolName, boolean daemon) {
super(poolName, daemon);
}
@Override
public Thread newThread(Runnable r) {
- Thread thread = super.newThread(r);
- this.thread = thread;
+ thread = super.newThread(r);
+ thread.setUncaughtExceptionHandler((t, e) ->
+ log.error("Thread {} got uncaught Exception", t.getName(),
e));
return thread;
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index fe7e0492916..38d85335bcd 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -24,6 +24,7 @@ import lombok.Getter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import org.apache.pulsar.client.util.ExecutorProvider;
public class InstanceCache {
@@ -34,7 +35,7 @@ public class InstanceCache {
private InstanceCache() {
ThreadFactory namedThreadFactory =
- new DefaultThreadFactory("function-timer-thread");
+ new
ExecutorProvider.ExtendedThreadFactory("function-timer-thread");
scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index 570408e975f..d15d2ba90c5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -20,6 +20,9 @@
package org.apache.pulsar.functions.worker;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Map;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -27,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.util.ExecutorProvider;
import java.util.HashMap;
import java.util.Map;
@@ -54,6 +58,17 @@ public class ClusterServiceCoordinator implements
AutoCloseable {
private final Supplier<Boolean> isLeader;
public ClusterServiceCoordinator(String workerId, LeaderService
leaderService, Supplier<Boolean> isLeader) {
+ this(workerId, leaderService, isLeader,
Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("cluster-service-coordinator-timer")));
+ }
+
+ @VisibleForTesting
+ ClusterServiceCoordinator(
+ String workerId,
+ LeaderService leaderService,
+ Supplier<Boolean> isLeader,
+ ScheduledExecutorService executor
+ ) {
this.workerId = workerId;
this.leaderService = leaderService;
this.isLeader = isLeader;