sijie closed pull request #3238: Fix NPE: namespaceService need leaderElection 
service
URL: https://github.com/apache/pulsar/pull/3238
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 74e91bee9f..f576f952e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -355,6 +355,9 @@ public void start() throws PulsarServerException {
             // Start load management service (even if load balancing is 
disabled)
             this.loadManager.set(LoadManager.create(this));
 
+            // Start the leader election service
+            startLeaderElectionService();
+
             // needs load management service
             this.startNamespaceService();
 
@@ -418,39 +421,6 @@ public Boolean get() {
             // Register heartbeat and bootstrap namespaces.
             this.nsservice.registerBootstrapNamespaces();
 
-            // Start the leader election service
-            this.leaderElectionService = new LeaderElectionService(this, new 
LeaderListener() {
-                @Override
-                public synchronized void brokerIsTheLeaderNow() {
-                    if (getConfiguration().isLoadBalancerEnabled()) {
-                        long loadSheddingInterval = TimeUnit.MINUTES
-                                
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
-                        long resourceQuotaUpdateInterval = TimeUnit.MINUTES
-                                
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-
-                        loadSheddingTask = 
loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
-                                loadSheddingInterval, loadSheddingInterval, 
TimeUnit.MILLISECONDS);
-                        loadResourceQuotaTask = 
loadManagerExecutor.scheduleAtFixedRate(
-                                new LoadResourceQuotaUpdaterTask(loadManager), 
resourceQuotaUpdateInterval,
-                                resourceQuotaUpdateInterval, 
TimeUnit.MILLISECONDS);
-                    }
-                }
-
-                @Override
-                public synchronized void brokerIsAFollowerNow() {
-                    if (loadSheddingTask != null) {
-                        loadSheddingTask.cancel(false);
-                        loadSheddingTask = null;
-                    }
-                    if (loadResourceQuotaTask != null) {
-                        loadResourceQuotaTask.cancel(false);
-                        loadResourceQuotaTask = null;
-                    }
-                }
-            });
-
-            leaderElectionService.start();
-
             schemaRegistryService = SchemaRegistryService.create(this);
 
             webService.start();
@@ -480,6 +450,40 @@ public synchronized void brokerIsAFollowerNow() {
         }
     }
 
+    private void startLeaderElectionService() {
+        this.leaderElectionService = new LeaderElectionService(this, new 
LeaderListener() {
+            @Override
+            public synchronized void brokerIsTheLeaderNow() {
+                if (getConfiguration().isLoadBalancerEnabled()) {
+                    long loadSheddingInterval = TimeUnit.MINUTES
+                            
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
+                    long resourceQuotaUpdateInterval = TimeUnit.MINUTES
+                            
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
+
+                    loadSheddingTask = 
loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
+                            loadSheddingInterval, loadSheddingInterval, 
TimeUnit.MILLISECONDS);
+                    loadResourceQuotaTask = 
loadManagerExecutor.scheduleAtFixedRate(
+                            new LoadResourceQuotaUpdaterTask(loadManager), 
resourceQuotaUpdateInterval,
+                            resourceQuotaUpdateInterval, 
TimeUnit.MILLISECONDS);
+                }
+            }
+
+            @Override
+            public synchronized void brokerIsAFollowerNow() {
+                if (loadSheddingTask != null) {
+                    loadSheddingTask.cancel(false);
+                    loadSheddingTask = null;
+                }
+                if (loadResourceQuotaTask != null) {
+                    loadResourceQuotaTask.cancel(false);
+                    loadResourceQuotaTask = null;
+                }
+            }
+        });
+
+        leaderElectionService.start();
+    }
+
     private void acquireSLANamespace() {
         try {
             // Namespace not created hence no need to unload it


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to