merlimat commented on a change in pull request #9240:
URL: https://github.com/apache/pulsar/pull/9240#discussion_r561199105
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -646,36 +669,45 @@ public Boolean get() {
}
}
- protected void startLeaderElectionService() {
- this.leaderElectionService = new LeaderElectionService(this, new
LeaderListener() {
- @Override
- public synchronized void brokerIsTheLeaderNow() {
- if (getConfiguration().isLoadBalancerEnabled()) {
- long loadSheddingInterval = TimeUnit.MINUTES
-
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
- long resourceQuotaUpdateInterval = TimeUnit.MINUTES
-
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-
- loadSheddingTask =
loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
- loadSheddingInterval, loadSheddingInterval,
TimeUnit.MILLISECONDS);
- loadResourceQuotaTask =
loadManagerExecutor.scheduleAtFixedRate(
- new LoadResourceQuotaUpdaterTask(loadManager),
resourceQuotaUpdateInterval,
- resourceQuotaUpdateInterval,
TimeUnit.MILLISECONDS);
- }
- }
+ public MetadataStoreExtended createLocalMetadataStore() throws
MetadataStoreException {
+ return MetadataStoreExtended.create(config.getZookeeperServers(),
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis((int)
config.getZooKeeperSessionTimeoutMillis())
+ .allowReadOnlyOperations(false)
+ .build());
+ }
- @Override
- public synchronized void brokerIsAFollowerNow() {
- if (loadSheddingTask != null) {
- loadSheddingTask.cancel(false);
- loadSheddingTask = null;
- }
- if (loadResourceQuotaTask != null) {
- loadResourceQuotaTask.cancel(false);
- loadResourceQuotaTask = null;
- }
- }
- });
+ protected void startLeaderElectionService() {
+ this.leaderElectionService = new
LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
+ state -> {
+ if (state == LeaderElectionState.Leading) {
+ LOG.info("This broker was elected leader");
+ if (getConfiguration().isLoadBalancerEnabled()) {
+ long loadSheddingInterval = TimeUnit.MINUTES
+
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
+ long resourceQuotaUpdateInterval = TimeUnit.MINUTES
+
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
+
+ loadSheddingTask =
loadManagerExecutor.scheduleAtFixedRate(
Review comment:
In theory it shouldn't happen, but it's a good point, I've added the
cancel if it's already set.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]