merlimat commented on a change in pull request #9240:
URL: https://github.com/apache/pulsar/pull/9240#discussion_r561199105



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -646,36 +669,45 @@ public Boolean get() {
         }
     }
 
-    protected void startLeaderElectionService() {
-        this.leaderElectionService = new LeaderElectionService(this, new 
LeaderListener() {
-            @Override
-            public synchronized void brokerIsTheLeaderNow() {
-                if (getConfiguration().isLoadBalancerEnabled()) {
-                    long loadSheddingInterval = TimeUnit.MINUTES
-                            
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
-                    long resourceQuotaUpdateInterval = TimeUnit.MINUTES
-                            
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-
-                    loadSheddingTask = 
loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
-                            loadSheddingInterval, loadSheddingInterval, 
TimeUnit.MILLISECONDS);
-                    loadResourceQuotaTask = 
loadManagerExecutor.scheduleAtFixedRate(
-                            new LoadResourceQuotaUpdaterTask(loadManager), 
resourceQuotaUpdateInterval,
-                            resourceQuotaUpdateInterval, 
TimeUnit.MILLISECONDS);
-                }
-            }
+    public MetadataStoreExtended createLocalMetadataStore() throws 
MetadataStoreException {
+        return MetadataStoreExtended.create(config.getZookeeperServers(),
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis((int) 
config.getZooKeeperSessionTimeoutMillis())
+                        .allowReadOnlyOperations(false)
+                        .build());
+    }
 
-            @Override
-            public synchronized void brokerIsAFollowerNow() {
-                if (loadSheddingTask != null) {
-                    loadSheddingTask.cancel(false);
-                    loadSheddingTask = null;
-                }
-                if (loadResourceQuotaTask != null) {
-                    loadResourceQuotaTask.cancel(false);
-                    loadResourceQuotaTask = null;
-                }
-            }
-        });
+    protected void startLeaderElectionService() {
+        this.leaderElectionService = new 
LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
+                state -> {
+                    if (state == LeaderElectionState.Leading) {
+                        LOG.info("This broker was elected leader");
+                        if (getConfiguration().isLoadBalancerEnabled()) {
+                            long loadSheddingInterval = TimeUnit.MINUTES
+                                    
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
+                            long resourceQuotaUpdateInterval = TimeUnit.MINUTES
+                                    
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
+
+                            loadSheddingTask = 
loadManagerExecutor.scheduleAtFixedRate(

Review comment:
       In theory it shouldn't happen, but it's a good point, I've added the 
cancel if it's already set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to