This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 07a193a142 Use separate executor for each coordinator duty group 
(#14869)
07a193a142 is described below

commit 07a193a1420b2942e5bc9ac97058438f5435102d
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 21 15:53:22 2023 +0530

    Use separate executor for each coordinator duty group (#14869)
    
    Changes:
    - Use separate executor for every duty group
    - This change is thread-safe as every duty group uses its own copy of
    `DruidCoordinatorRuntimeParams` and does not share any other mutable 
instances
    with other duty groups.
    - With the exception of `HistoricalManagementDuties`, duty groups are 
typically not
    very compute intensive and mostly perform database or HTTP I/O. So, 
coordinator
    resources would still mostly be available for `HistoricalManagementDuties`.
---
 .../druid/server/coordinator/DruidCoordinator.java | 42 +++++++++++++++-------
 .../simulate/CoordinatorSimulationBuilder.java     | 14 ++++----
 2 files changed, 36 insertions(+), 20 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index a7b8417351..363a5758d0 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -20,12 +20,12 @@
 package org.apache.druid.server.coordinator;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Inject;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntMaps;
@@ -46,6 +46,7 @@ import 
org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
 import 
org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@@ -99,7 +100,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -141,7 +141,8 @@ public class DruidCoordinator
 
   private final ServiceEmitter emitter;
   private final OverlordClient overlordClient;
-  private final ScheduledExecutorService exec;
+  private final ScheduledExecutorFactory executorFactory;
+  private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new 
HashMap<>();
   private final LoadQueueTaskMaster taskMaster;
   private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons = 
new ConcurrentHashMap<>();
   private final SegmentLoadQueueManager loadQueueManager;
@@ -216,7 +217,7 @@ public class DruidCoordinator
     this.metadataStoreManagementDuties = metadataStoreManagementDuties;
     this.customDutyGroups = customDutyGroups;
 
-    this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
+    this.executorFactory = scheduledExecutorFactory;
 
     this.balancerStrategyFactory = balancerStrategyFactory;
     this.lookupCoordinatorManager = lookupCoordinatorManager;
@@ -395,7 +396,7 @@ public class DruidCoordinator
 
       started = false;
 
-      exec.shutdownNow();
+      stopAllDutyGroupExecutors();
 
       if (balancerExec != null) {
         balancerExec.shutdownNow();
@@ -492,12 +493,10 @@ public class DruidCoordinator
       }
 
       for (final DutiesRunnable dutiesRunnable : dutiesRunnables) {
-        // CompactSegmentsDuty can takes a non trival amount of time to 
complete.
-        // Hence, we schedule at fixed rate to make sure the other tasks still 
run at approximately every
-        // config.getCoordinatorIndexingPeriod() period. Note that cautious 
should be taken
-        // if setting config.getCoordinatorIndexingPeriod() lower than the 
default value.
+        // Several coordinator duties can take a non trival amount of time to 
complete.
+        // Hence, we schedule each duty group on a dedicated executor
         ScheduledExecutors.scheduleAtFixedRate(
-            exec,
+            getOrCreateDutyGroupExecutor(dutiesRunnable.dutyGroupName),
             config.getCoordinatorStartDelay(),
             dutiesRunnable.getPeriod(),
             () -> {
@@ -544,6 +543,22 @@ public class DruidCoordinator
     }
   }
 
+  @GuardedBy("lock")
+  private ScheduledExecutorService getOrCreateDutyGroupExecutor(String 
dutyGroup)
+  {
+    return dutyGroupExecutors.computeIfAbsent(
+        dutyGroup,
+        group -> executorFactory.create(1, "Coordinator-Exec-" + dutyGroup + 
"-%d")
+    );
+  }
+
+  @GuardedBy("lock")
+  private void stopAllDutyGroupExecutors()
+  {
+    dutyGroupExecutors.values().forEach(ScheduledExecutorService::shutdownNow);
+    dutyGroupExecutors.clear();
+  }
+
   /**
    * Resets the balancerExec if required and creates a new BalancerStrategy for
    * the current coordinator run.
@@ -733,7 +748,7 @@ public class DruidCoordinator
               && coordLeaderSelector.isLeader()
               && startingLeaderCounter == coordLeaderSelector.localTerm()) {
 
-            dutyRunTime.reset().start();
+            dutyRunTime.restart();
             params = duty.run(params);
             dutyRunTime.stop();
 
@@ -743,7 +758,7 @@ public class DruidCoordinator
               return;
             } else {
               final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
-              final long dutyRunMillis = 
dutyRunTime.elapsed(TimeUnit.MILLISECONDS);
+              final long dutyRunMillis = dutyRunTime.millisElapsed();
               
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, 
dutyRunMillis);
             }
           }
@@ -769,7 +784,8 @@ public class DruidCoordinator
         }
 
         // Emit the runtime of the full DutiesRunnable
-        final long runMillis = 
groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
+        groupRunTime.stop();
+        final long runMillis = groupRunTime.millisElapsed();
         emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), 
runMillis);
         log.info("Finished coordinator run for group [%s] in [%d] ms.%n", 
dutyGroupName, runMillis);
       }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index e40bf8ee16..5ad7ffab36 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -277,6 +277,7 @@ public class CoordinatorSimulationBuilder
       try {
         env.setUp();
         coordinator.start();
+        env.executorFactory.findExecutors();
       }
       catch (Exception e) {
         throw new ISE(e, "Exception while running simulation");
@@ -309,8 +310,8 @@ public class CoordinatorSimulationBuilder
       verifySimulationRunning();
       env.serviceEmitter.flush();
 
-      // Invoke historical duties and metadata duties
-      env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
+      // Invoke historical duties
+      env.executorFactory.historicalDutiesRunner.finishNextPendingTasks(1);
     }
 
     @Override
@@ -504,7 +505,6 @@ public class CoordinatorSimulationBuilder
       inventory.setUp();
       coordinatorInventoryView.setUp();
       lifecycle.start();
-      executorFactory.setUp();
       leaderSelector.becomeLeader();
       EasyMock.replay(mocks.toArray());
     }
@@ -554,7 +554,7 @@ public class CoordinatorSimulationBuilder
     static final String HISTORICAL_LOADER = "historical-loader-%d";
     static final String LOAD_QUEUE_EXECUTOR = "load-queue-%d";
     static final String LOAD_CALLBACK_EXECUTOR = "load-callback-%d";
-    static final String COORDINATOR_RUNNER = "Coordinator-Exec--%d";
+    static final String COORDINATOR_RUNNER = 
"Coordinator-Exec-HistoricalManagementDuties-%d";
 
     private final Map<String, BlockingExecutorService> blockingExecutors = new 
HashMap<>();
     private final boolean directExecution;
@@ -562,7 +562,7 @@ public class CoordinatorSimulationBuilder
     private BlockingExecutorService historicalLoader;
     private BlockingExecutorService loadQueueExecutor;
     private BlockingExecutorService loadCallbackExecutor;
-    private BlockingExecutorService coordinatorRunner;
+    private BlockingExecutorService historicalDutiesRunner;
 
     private ExecutorFactory(boolean directExecution)
     {
@@ -588,9 +588,9 @@ public class CoordinatorSimulationBuilder
       return blockingExecutors.get(nameFormat);
     }
 
-    private void setUp()
+    private void findExecutors()
     {
-      coordinatorRunner = findExecutor(COORDINATOR_RUNNER);
+      historicalDutiesRunner = findExecutor(COORDINATOR_RUNNER);
       historicalLoader = findExecutor(HISTORICAL_LOADER);
       loadQueueExecutor = findExecutor(LOAD_QUEUE_EXECUTOR);
       loadCallbackExecutor = findExecutor(LOAD_CALLBACK_EXECUTOR);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to