This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea6f366 Fix NPE: namespaceService need leaderElection service (#3238)
ea6f366 is described below
commit ea6f366997c7a693f926df4d340ff87e9177db1d
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Dec 25 21:35:33 2018 -0800
Fix NPE: namespaceService need leaderElection service (#3238)
### Motivation
namespace-service uses leaderElectionService so, leaderElectionService
should start before namespace-service.
```
Caused by: java.lang.NullPointerException
at
org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:376)
~[pulsar-broker-2.2.jar]
... 9 more
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[?:1.8.0_131]
at
org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:395)
~[pulsar-broker-2.2.jar]
at
org.apache.pulsar.broker.namespace.NamespaceService.lambda$22(NamespaceService.java:335)
~[pulsar-broker-2.2.jar]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_131]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_131]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_131]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[?:1.8.0_131]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[?:1.8.0_131]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[pulsar-functions-metrics-2.2.2-yahoo.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
```
---
.../org/apache/pulsar/broker/PulsarService.java | 70 ++++++++++++----------
1 file changed, 37 insertions(+), 33 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 74e91be..f576f95 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -355,6 +355,9 @@ public class PulsarService implements AutoCloseable {
// Start load management service (even if load balancing is
disabled)
this.loadManager.set(LoadManager.create(this));
+ // Start the leader election service
+ startLeaderElectionService();
+
// needs load management service
this.startNamespaceService();
@@ -418,39 +421,6 @@ public class PulsarService implements AutoCloseable {
// Register heartbeat and bootstrap namespaces.
this.nsservice.registerBootstrapNamespaces();
- // Start the leader election service
- this.leaderElectionService = new LeaderElectionService(this, new
LeaderListener() {
- @Override
- public synchronized void brokerIsTheLeaderNow() {
- if (getConfiguration().isLoadBalancerEnabled()) {
- long loadSheddingInterval = TimeUnit.MINUTES
-
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
- long resourceQuotaUpdateInterval = TimeUnit.MINUTES
-
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-
- loadSheddingTask =
loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
- loadSheddingInterval, loadSheddingInterval,
TimeUnit.MILLISECONDS);
- loadResourceQuotaTask =
loadManagerExecutor.scheduleAtFixedRate(
- new LoadResourceQuotaUpdaterTask(loadManager),
resourceQuotaUpdateInterval,
- resourceQuotaUpdateInterval,
TimeUnit.MILLISECONDS);
- }
- }
-
- @Override
- public synchronized void brokerIsAFollowerNow() {
- if (loadSheddingTask != null) {
- loadSheddingTask.cancel(false);
- loadSheddingTask = null;
- }
- if (loadResourceQuotaTask != null) {
- loadResourceQuotaTask.cancel(false);
- loadResourceQuotaTask = null;
- }
- }
- });
-
- leaderElectionService.start();
-
schemaRegistryService = SchemaRegistryService.create(this);
webService.start();
@@ -480,6 +450,40 @@ public class PulsarService implements AutoCloseable {
}
}
+ private void startLeaderElectionService() {
+ this.leaderElectionService = new LeaderElectionService(this, new
LeaderListener() {
+ @Override
+ public synchronized void brokerIsTheLeaderNow() {
+ if (getConfiguration().isLoadBalancerEnabled()) {
+ long loadSheddingInterval = TimeUnit.MINUTES
+
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
+ long resourceQuotaUpdateInterval = TimeUnit.MINUTES
+
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
+
+ loadSheddingTask =
loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
+ loadSheddingInterval, loadSheddingInterval,
TimeUnit.MILLISECONDS);
+ loadResourceQuotaTask =
loadManagerExecutor.scheduleAtFixedRate(
+ new LoadResourceQuotaUpdaterTask(loadManager),
resourceQuotaUpdateInterval,
+ resourceQuotaUpdateInterval,
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public synchronized void brokerIsAFollowerNow() {
+ if (loadSheddingTask != null) {
+ loadSheddingTask.cancel(false);
+ loadSheddingTask = null;
+ }
+ if (loadResourceQuotaTask != null) {
+ loadResourceQuotaTask.cancel(false);
+ loadResourceQuotaTask = null;
+ }
+ }
+ });
+
+ leaderElectionService.start();
+ }
+
private void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it