danprince1 commented on a change in pull request #12013:
URL: https://github.com/apache/druid/pull/12013#discussion_r776867062



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -665,68 +681,91 @@ private void becomeLeader()
       final int startingLeaderCounter = coordLeaderSelector.localTerm();
 
       final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = 
new ArrayList<>();
+      LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.ALL;
+      if (config.isLoadPrimaryReplicantSeparately()) {
+        dutiesRunnables.add(
+            Pair.of(
+                new DutiesRunnable(
+                    makePrimaryReplicantManagementDuties(),
+                    startingLeaderCounter,
+                    PRIMARY_REPLICANT_LOADING_DUTIES_GROUP,
+                    RunRules.RunRulesMode.LOAD_RULE_ONLY,
+                    LoadRule.LoadRuleMode.PRIMARY_ONLY
+                ),
+                config.getCoordinatorPrimaryReplicantLoaderPeriod()
+            )
+        );
+        historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.REPLICA_ONLY;
+      }
       dutiesRunnables.add(
           Pair.of(
-              new DutiesRunnable(makeHistoricalManagementDuties(), 
startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
+              new DutiesRunnable(
+                  makeHistoricalManagementDuties(),
+                  startingLeaderCounter,
+                  HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP,
+                  historicalManagementDutiesLoadRuleMode
+              ),
               config.getCoordinatorPeriod()
           )
       );
+      //noinspection VariableNotUsedInsideIf
       if (indexingServiceClient != null) {
         dutiesRunnables.add(
             Pair.of(
-                new DutiesRunnable(makeIndexingServiceDuties(), 
startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
+                new DutiesRunnable(
+                    makeIndexingServiceDuties(),
+                    startingLeaderCounter,
+                    INDEXING_SERVICE_DUTIES_DUTY_GROUP
+                ),
                 config.getCoordinatorIndexingPeriod()
             )
         );
       }
       dutiesRunnables.add(
           Pair.of(
-              new DutiesRunnable(makeMetadataStoreManagementDuties(), 
startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+              new DutiesRunnable(
+                  makeMetadataStoreManagementDuties(),
+                  startingLeaderCounter,
+                  METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP
+              ),
               config.getCoordinatorMetadataStoreManagementPeriod()
           )
       );
 
       for (CoordinatorCustomDutyGroup customDutyGroup : 
customDutyGroups.getCoordinatorCustomDutyGroups()) {
         dutiesRunnables.add(
             Pair.of(
-                new DutiesRunnable(customDutyGroup.getCustomDutyList(), 
startingLeaderCounter, customDutyGroup.getName()),
+                new DutiesRunnable(
+                    customDutyGroup.getCustomDutyList(),
+                    startingLeaderCounter,
+                    customDutyGroup.getName()
+                ),
                 customDutyGroup.getPeriod()
             )
         );
         log.info(
             "Done making custom coordinator duties %s for group %s",
-            customDutyGroup.getCustomDutyList().stream().map(duty -> 
duty.getClass().getName()).collect(Collectors.toList()),
+            customDutyGroup.getCustomDutyList()
+                           .stream()
+                           .map(duty -> duty.getClass().getName())
+                           .collect(Collectors.toList()),
             customDutyGroup.getName()
         );
       }
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : 
dutiesRunnables) {
-        // CompactSegmentsDuty can takes a non trival amount of time to 
complete.
-        // Hence, we schedule at fixed rate to make sure the other tasks still 
run at approximately every
-        // config.getCoordinatorIndexingPeriod() period. Note that cautious 
should be taken
-        // if setting config.getCoordinatorIndexingPeriod() lower than the 
default value.
-        ScheduledExecutors.scheduleAtFixedRate(
-            exec,
-            config.getCoordinatorStartDelay(),
-            dutiesRunnable.rhs,
-            new Callable<ScheduledExecutors.Signal>()
-            {
-              private final DutiesRunnable theRunnable = dutiesRunnable.lhs;
-
-              @Override
-              public ScheduledExecutors.Signal call()
-              {
-                if (coordLeaderSelector.isLeader() && startingLeaderCounter == 
coordLeaderSelector.localTerm()) {
-                  theRunnable.run();
-                }
-                if (coordLeaderSelector.isLeader()
-                    && startingLeaderCounter == 
coordLeaderSelector.localTerm()) { // (We might no longer be leader)
-                  return ScheduledExecutors.Signal.REPEAT;
-                } else {
-                  return ScheduledExecutors.Signal.STOP;
-                }
+        // Note that caution should be taken if setting 
config.getCoordinatorIndexingPeriod() lower than the default
+        // value.
+        exec.scheduleAtFixedRate(

Review comment:
       Yeah, this is a good point, but I think we're OK.  This code used to use 
`ScheduledExecutors.scheduleAtFixedRate()`, which is a home-grown version of 
the jvm `ScheduledExecutorService.scheduleAtFixedRate()`.  I found that the 
druid version would often start multiple versions of the same Runnable at a 
time, whereas the jvm version 
[guarantees](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleAtFixedRate-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-)
 that this won't happen.
   
   Yes, the druid version has the 'signaling' stuff which keeps it from running 
again if it's not the leader, but we also check for that in the beginning of 
`DruidCoordinator.run()` anyway, so it seems safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to