eolivelli commented on a change in pull request #9240:
URL: https://github.com/apache/pulsar/pull/9240#discussion_r560737419



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -646,36 +669,45 @@ public Boolean get() {
         }
     }
 
-    protected void startLeaderElectionService() {
-        this.leaderElectionService = new LeaderElectionService(this, new 
LeaderListener() {
-            @Override
-            public synchronized void brokerIsTheLeaderNow() {
-                if (getConfiguration().isLoadBalancerEnabled()) {
-                    long loadSheddingInterval = TimeUnit.MINUTES
-                            
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
-                    long resourceQuotaUpdateInterval = TimeUnit.MINUTES
-                            
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-
-                    loadSheddingTask = 
loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
-                            loadSheddingInterval, loadSheddingInterval, 
TimeUnit.MILLISECONDS);
-                    loadResourceQuotaTask = 
loadManagerExecutor.scheduleAtFixedRate(
-                            new LoadResourceQuotaUpdaterTask(loadManager), 
resourceQuotaUpdateInterval,
-                            resourceQuotaUpdateInterval, 
TimeUnit.MILLISECONDS);
-                }
-            }
+    public MetadataStoreExtended createLocalMetadataStore() throws 
MetadataStoreException {
+        return MetadataStoreExtended.create(config.getZookeeperServers(),
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis((int) 
config.getZooKeeperSessionTimeoutMillis())
+                        .allowReadOnlyOperations(false)
+                        .build());
+    }
 
-            @Override
-            public synchronized void brokerIsAFollowerNow() {
-                if (loadSheddingTask != null) {
-                    loadSheddingTask.cancel(false);
-                    loadSheddingTask = null;
-                }
-                if (loadResourceQuotaTask != null) {
-                    loadResourceQuotaTask.cancel(false);
-                    loadResourceQuotaTask = null;
-                }
-            }
-        });
+    protected void startLeaderElectionService() {
+        this.leaderElectionService = new 
LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
+                state -> {
+                    if (state == LeaderElectionState.Leading) {
+                        LOG.info("This broker was elected leader");
+                        if (getConfiguration().isLoadBalancerEnabled()) {
+                            long loadSheddingInterval = TimeUnit.MINUTES
+                                    
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
+                            long resourceQuotaUpdateInterval = TimeUnit.MINUTES
+                                    
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
+
+                            loadSheddingTask = 
loadManagerExecutor.scheduleAtFixedRate(

Review comment:
       may it happen that we already have a value here in `loadSheddingTask` ? 
(some rare bug in the LeaderElectionService ?)
   should be try to cancel it ?, the same for `loadResourceQuotaTask`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to