This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 07a193a142 Use separate executor for each coordinator duty group
(#14869)
07a193a142 is described below
commit 07a193a1420b2942e5bc9ac97058438f5435102d
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 21 15:53:22 2023 +0530
Use separate executor for each coordinator duty group (#14869)
Changes:
- Use separate executor for every duty group
- This change is thread-safe as every duty group uses its own copy of
`DruidCoordinatorRuntimeParams` and does not share any other mutable
instances
with other duty groups.
- With the exception of `HistoricalManagementDuties`, duty groups are
typically not
very compute intensive and mostly perform database or HTTP I/O. So,
coordinator
resources would still mostly be available for `HistoricalManagementDuties`.
---
.../druid/server/coordinator/DruidCoordinator.java | 42 +++++++++++++++-------
.../simulate/CoordinatorSimulationBuilder.java | 14 ++++----
2 files changed, 36 insertions(+), 20 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index a7b8417351..363a5758d0 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -20,12 +20,12 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
@@ -46,6 +46,7 @@ import
org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
import
org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@@ -99,7 +100,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -141,7 +141,8 @@ public class DruidCoordinator
private final ServiceEmitter emitter;
private final OverlordClient overlordClient;
- private final ScheduledExecutorService exec;
+ private final ScheduledExecutorFactory executorFactory;
+ private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new
HashMap<>();
private final LoadQueueTaskMaster taskMaster;
private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons =
new ConcurrentHashMap<>();
private final SegmentLoadQueueManager loadQueueManager;
@@ -216,7 +217,7 @@ public class DruidCoordinator
this.metadataStoreManagementDuties = metadataStoreManagementDuties;
this.customDutyGroups = customDutyGroups;
- this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
+ this.executorFactory = scheduledExecutorFactory;
this.balancerStrategyFactory = balancerStrategyFactory;
this.lookupCoordinatorManager = lookupCoordinatorManager;
@@ -395,7 +396,7 @@ public class DruidCoordinator
started = false;
- exec.shutdownNow();
+ stopAllDutyGroupExecutors();
if (balancerExec != null) {
balancerExec.shutdownNow();
@@ -492,12 +493,10 @@ public class DruidCoordinator
}
for (final DutiesRunnable dutiesRunnable : dutiesRunnables) {
- // CompactSegmentsDuty can takes a non trival amount of time to
complete.
- // Hence, we schedule at fixed rate to make sure the other tasks still
run at approximately every
- // config.getCoordinatorIndexingPeriod() period. Note that cautious
should be taken
- // if setting config.getCoordinatorIndexingPeriod() lower than the
default value.
+ // Several coordinator duties can take a non trival amount of time to
complete.
+ // Hence, we schedule each duty group on a dedicated executor
ScheduledExecutors.scheduleAtFixedRate(
- exec,
+ getOrCreateDutyGroupExecutor(dutiesRunnable.dutyGroupName),
config.getCoordinatorStartDelay(),
dutiesRunnable.getPeriod(),
() -> {
@@ -544,6 +543,22 @@ public class DruidCoordinator
}
}
+ @GuardedBy("lock")
+ private ScheduledExecutorService getOrCreateDutyGroupExecutor(String
dutyGroup)
+ {
+ return dutyGroupExecutors.computeIfAbsent(
+ dutyGroup,
+ group -> executorFactory.create(1, "Coordinator-Exec-" + dutyGroup +
"-%d")
+ );
+ }
+
+ @GuardedBy("lock")
+ private void stopAllDutyGroupExecutors()
+ {
+ dutyGroupExecutors.values().forEach(ScheduledExecutorService::shutdownNow);
+ dutyGroupExecutors.clear();
+ }
+
/**
* Resets the balancerExec if required and creates a new BalancerStrategy for
* the current coordinator run.
@@ -733,7 +748,7 @@ public class DruidCoordinator
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
- dutyRunTime.reset().start();
+ dutyRunTime.restart();
params = duty.run(params);
dutyRunTime.stop();
@@ -743,7 +758,7 @@ public class DruidCoordinator
return;
} else {
final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
- final long dutyRunMillis =
dutyRunTime.elapsed(TimeUnit.MILLISECONDS);
+ final long dutyRunMillis = dutyRunTime.millisElapsed();
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey,
dutyRunMillis);
}
}
@@ -769,7 +784,8 @@ public class DruidCoordinator
}
// Emit the runtime of the full DutiesRunnable
- final long runMillis =
groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
+ groupRunTime.stop();
+ final long runMillis = groupRunTime.millisElapsed();
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(),
runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms.%n",
dutyGroupName, runMillis);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index e40bf8ee16..5ad7ffab36 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -277,6 +277,7 @@ public class CoordinatorSimulationBuilder
try {
env.setUp();
coordinator.start();
+ env.executorFactory.findExecutors();
}
catch (Exception e) {
throw new ISE(e, "Exception while running simulation");
@@ -309,8 +310,8 @@ public class CoordinatorSimulationBuilder
verifySimulationRunning();
env.serviceEmitter.flush();
- // Invoke historical duties and metadata duties
- env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
+ // Invoke historical duties
+ env.executorFactory.historicalDutiesRunner.finishNextPendingTasks(1);
}
@Override
@@ -504,7 +505,6 @@ public class CoordinatorSimulationBuilder
inventory.setUp();
coordinatorInventoryView.setUp();
lifecycle.start();
- executorFactory.setUp();
leaderSelector.becomeLeader();
EasyMock.replay(mocks.toArray());
}
@@ -554,7 +554,7 @@ public class CoordinatorSimulationBuilder
static final String HISTORICAL_LOADER = "historical-loader-%d";
static final String LOAD_QUEUE_EXECUTOR = "load-queue-%d";
static final String LOAD_CALLBACK_EXECUTOR = "load-callback-%d";
- static final String COORDINATOR_RUNNER = "Coordinator-Exec--%d";
+ static final String COORDINATOR_RUNNER =
"Coordinator-Exec-HistoricalManagementDuties-%d";
private final Map<String, BlockingExecutorService> blockingExecutors = new
HashMap<>();
private final boolean directExecution;
@@ -562,7 +562,7 @@ public class CoordinatorSimulationBuilder
private BlockingExecutorService historicalLoader;
private BlockingExecutorService loadQueueExecutor;
private BlockingExecutorService loadCallbackExecutor;
- private BlockingExecutorService coordinatorRunner;
+ private BlockingExecutorService historicalDutiesRunner;
private ExecutorFactory(boolean directExecution)
{
@@ -588,9 +588,9 @@ public class CoordinatorSimulationBuilder
return blockingExecutors.get(nameFormat);
}
- private void setUp()
+ private void findExecutors()
{
- coordinatorRunner = findExecutor(COORDINATOR_RUNNER);
+ historicalDutiesRunner = findExecutor(COORDINATOR_RUNNER);
historicalLoader = findExecutor(HISTORICAL_LOADER);
loadQueueExecutor = findExecutor(LOAD_QUEUE_EXECUTOR);
loadCallbackExecutor = findExecutor(LOAD_CALLBACK_EXECUTOR);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]