This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9376d8d6e1 Refactor: Move `UpdateCoordinatorStateAndPrepareCluster`
duty out of `DruidCoordinator` (#14845)
9376d8d6e1 is described below
commit 9376d8d6e1c952a57aef1aa6e863547f95c338a7
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Aug 22 19:50:41 2023 +0530
Refactor: Move `UpdateCoordinatorStateAndPrepareCluster` duty out of
`DruidCoordinator` (#14845)
Motivation:
- Clean up `DruidCoordinator` and move methods to classes where they are
most relevant
Changes:
- No functional change
- Add duty `PrepareBalancerAndLoadQueues` to replace
`UpdateCoordinatorState`
- Move map of `LoadQueuePeon` from `DruidCoordinator` to
`LoadQueueTaskMaster`
- Make `BalancerStrategyFactory` an abstract class and keep the balancer
executor here
- Move reporting of used segment stats and historical capacity stats from
`CollectSegmentAndServerStats` to `PrepareBalancerAndLoadQueues`
- Move reporting of unavailable and under-replicated segment stats from
`CollectSegmentAndServerStats` to `UpdateReplicationStatus` duty
---
.../druid/server/coordinator/DruidCoordinator.java | 253 +++------------------
.../coordinator/DruidCoordinatorRuntimeParams.java | 2 +-
.../balancer/BalancerStrategyFactory.java | 45 +++-
.../CachingCostBalancerStrategyFactory.java | 5 +-
.../balancer/CostBalancerStrategyFactory.java | 8 +-
...DisabledCachingCostBalancerStrategyFactory.java | 7 +-
.../DiskNormalizedCostBalancerStrategyFactory.java | 8 +-
.../balancer/RandomBalancerStrategyFactory.java | 6 +-
.../duty/CollectSegmentAndServerStats.java | 70 ++----
.../duty/PrepareBalancerAndLoadQueues.java | 196 ++++++++++++++++
.../coordinator/loading/LoadQueueTaskMaster.java | 80 ++++++-
.../server/coordinator/DruidCoordinatorTest.java | 71 +-----
.../balancer/BalancerStrategyFactoryTest.java | 45 ++--
.../coordinator/duty/BalanceSegmentsTest.java | 4 +-
.../duty/CollectSegmentAndServerStatsTest.java | 22 +-
.../coordinator/duty/CompactSegmentsTest.java | 2 +-
.../duty/MarkOvershadowedSegmentsAsUnusedTest.java | 2 +-
17 files changed, 437 insertions(+), 389 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 832f7790ad..faa399ba73 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -23,8 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
@@ -35,7 +33,6 @@ import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
-import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.common.config.JacksonConfigManager;
@@ -47,7 +44,6 @@ import
org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Comparators;
@@ -60,7 +56,6 @@ import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
@@ -70,12 +65,12 @@ import
org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import
org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
+import org.apache.druid.server.coordinator.duty.PrepareBalancerAndLoadQueues;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
-import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -98,7 +93,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -144,7 +138,6 @@ public class DruidCoordinator
private final ScheduledExecutorFactory executorFactory;
private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new
HashMap<>();
private final LoadQueueTaskMaster taskMaster;
- private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons =
new ConcurrentHashMap<>();
private final SegmentLoadQueueManager loadQueueManager;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
@@ -171,9 +164,6 @@ public class DruidCoordinator
*/
private volatile SegmentReplicationStatus segmentReplicationStatus = null;
- private int cachedBalancerThreadNumber;
- private ListeningExecutorService balancerExec;
-
public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP =
"HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP =
"MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP =
"IndexingServiceDuties";
@@ -233,7 +223,7 @@ public class DruidCoordinator
public Map<String, LoadQueuePeon> getLoadManagementPeons()
{
- return loadManagementPeons;
+ return taskMaster.getAllPeons();
}
public Map<String, Object2LongMap<String>>
getTierToDatasourceToUnderReplicatedCount(boolean useClusterView)
@@ -344,18 +334,6 @@ public class DruidCoordinator
return coordLeaderSelector.getCurrentLeader();
}
- @VisibleForTesting
- public int getCachedBalancerThreadNumber()
- {
- return cachedBalancerThreadNumber;
- }
-
- @VisibleForTesting
- public ListeningExecutorService getBalancerExec()
- {
- return balancerExec;
- }
-
@LifecycleStart
public void start()
{
@@ -397,10 +375,7 @@ public class DruidCoordinator
started = false;
stopAllDutyGroupExecutors();
-
- if (balancerExec != null) {
- balancerExec.shutdownNow();
- }
+ balancerStrategyFactory.stopExecutor();
}
}
@@ -440,6 +415,7 @@ public class DruidCoordinator
config.getCoordinatorStartDelay()
);
+ taskMaster.onLeaderStart();
segmentsMetadataManager.startPollingDatabasePeriodically();
segmentsMetadataManager.populateUsedFlagLastUpdatedAsync();
metadataRuleManager.start();
@@ -524,22 +500,13 @@ public class DruidCoordinator
log.info("I am no longer the leader...");
- for (String server : loadManagementPeons.keySet()) {
- LoadQueuePeon peon = loadManagementPeons.remove(server);
- peon.stop();
- }
- loadManagementPeons.clear();
-
+ taskMaster.onLeaderStop();
serviceAnnouncer.unannounce(self);
lookupCoordinatorManager.stop();
metadataRuleManager.stop();
segmentsMetadataManager.stopPollingDatabasePeriodically();
segmentsMetadataManager.stopAsyncUsedFlagLastUpdatedUpdate();
-
- if (balancerExec != null) {
- balancerExec.shutdownNow();
- balancerExec = null;
- }
+ balancerStrategyFactory.stopExecutor();
}
}
@@ -559,53 +526,21 @@ public class DruidCoordinator
dutyGroupExecutors.clear();
}
- /**
- * Resets the balancerExec if required and creates a new BalancerStrategy for
- * the current coordinator run.
- */
- @VisibleForTesting
- BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
- {
- // Reset balancerExecutor if required
- if (balancerExec == null) {
- balancerExec = createNewBalancerExecutor(balancerComputeThreads);
- } else if (cachedBalancerThreadNumber != balancerComputeThreads) {
- log.info(
- "'balancerComputeThreads' has changed from [%d] to [%d]",
- cachedBalancerThreadNumber, balancerComputeThreads
- );
- balancerExec.shutdownNow();
- balancerExec = createNewBalancerExecutor(balancerComputeThreads);
- }
-
- // Create BalancerStrategy
- final BalancerStrategy balancerStrategy =
balancerStrategyFactory.createBalancerStrategy(balancerExec);
- log.info(
- "Using balancer strategy[%s] with [%d] threads.",
- balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
- );
- return balancerStrategy;
- }
-
- private ListeningExecutorService createNewBalancerExecutor(int numThreads)
- {
- log.info("Creating new balancer executor with [%d] threads.", numThreads);
- cachedBalancerThreadNumber = numThreads;
- return MoreExecutors.listeningDecorator(
- Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
- );
- }
-
private List<CoordinatorDuty> makeHistoricalManagementDuties()
{
return ImmutableList.of(
- new UpdateCoordinatorStateAndPrepareCluster(),
+ new PrepareBalancerAndLoadQueues(
+ taskMaster,
+ loadQueueManager,
+ balancerStrategyFactory,
+ serverInventoryView
+ ),
new RunRules(segmentsMetadataManager::markSegmentsAsUnused),
new UpdateReplicationStatus(),
new UnloadUnusedSegments(loadQueueManager),
new
MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused),
new BalanceSegments(config.getCoordinatorPeriod()),
- new CollectSegmentAndServerStats(DruidCoordinator.this)
+ new CollectSegmentAndServerStats(taskMaster)
);
}
@@ -724,7 +659,7 @@ public class DruidCoordinator
DruidCoordinatorRuntimeParams
.newBuilder(coordinatorStartTime)
.withDatabaseRuleManager(metadataRuleManager)
-
.withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot)
+ .withDataSourcesSnapshot(dataSourcesSnapshot)
.withDynamicConfigs(getDynamicConfigs())
.withCompactionConfig(getCompactionConfig())
.build();
@@ -816,149 +751,6 @@ public class DruidCoordinator
}
}
- /**
- * This duty does the following:
- * <ul>
- * <li>Prepares an immutable {@link DruidCluster} consisting of {@link
ServerHolder}s
- * which represent the current state of the servers in the cluster.</li>
- * <li>Starts and stops load peons for new and disappeared servers
respectively.</li>
- * <li>Cancels in-progress loads on all decommissioning servers. This is
done
- * here to ensure that under-replicated segments are assigned to active
servers
- * in the {@link RunRules} duty after this.</li>
- * <li>Initializes the {@link BalancerStrategy} for the run.</li>
- * </ul>
- *
- * @see #makeHistoricalManagementDuties() for the order of duties
- */
- private class UpdateCoordinatorStateAndPrepareCluster implements
CoordinatorDuty
- {
- @Nullable
- @Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
- {
- List<ImmutableDruidServer> currentServers = prepareCurrentServers();
-
- startPeonsForNewServers(currentServers);
- stopPeonsForDisappearedServers(currentServers);
-
- final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
- final SegmentLoadingConfig segmentLoadingConfig =
params.getSegmentLoadingConfig();
-
- final DruidCluster cluster = prepareCluster(dynamicConfig,
segmentLoadingConfig, currentServers);
- cancelLoadsOnDecommissioningServers(cluster);
-
- final BalancerStrategy balancerStrategy
- =
createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
- return params.buildFromExisting()
- .withDruidCluster(cluster)
- .withBalancerStrategy(balancerStrategy)
- .withSegmentAssignerUsing(loadQueueManager)
- .build();
- }
-
- /**
- * Cancels all load/move operations on decommissioning servers. This should
- * be done before initializing the SegmentReplicantLookup so that
- * under-replicated segments can be assigned in the current run itself.
- */
- private void cancelLoadsOnDecommissioningServers(DruidCluster cluster)
- {
- final AtomicInteger cancelledCount = new AtomicInteger(0);
- final List<ServerHolder> decommissioningServers
- = cluster.getAllServers().stream()
- .filter(ServerHolder::isDecommissioning)
- .collect(Collectors.toList());
-
- for (ServerHolder server : decommissioningServers) {
- server.getQueuedSegments().forEach(
- (segment, action) -> {
- // Cancel the operation if it is a type of load
- if (action.isLoad() && server.cancelOperation(action, segment)) {
- cancelledCount.incrementAndGet();
- }
- }
- );
- }
-
- if (cancelledCount.get() > 0) {
- log.info(
- "Cancelled [%d] load/move operations on [%d] decommissioning
servers.",
- cancelledCount.get(), decommissioningServers.size()
- );
- }
- }
-
- List<ImmutableDruidServer> prepareCurrentServers()
- {
- List<ImmutableDruidServer> currentServers = serverInventoryView
- .getInventory()
- .stream()
- .filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
- .map(DruidServer::toImmutableDruidServer)
- .collect(Collectors.toList());
-
- if (log.isDebugEnabled()) {
- // Display info about all segment-replicatable (historical and bridge)
servers
- log.debug("Servers");
- for (ImmutableDruidServer druidServer : currentServers) {
- log.debug(" %s", druidServer);
- log.debug(" -- DataSources");
- for (ImmutableDruidDataSource druidDataSource :
druidServer.getDataSources()) {
- log.debug(" %s", druidDataSource);
- }
- }
- }
- return currentServers;
- }
-
- void startPeonsForNewServers(List<ImmutableDruidServer> currentServers)
- {
- for (ImmutableDruidServer server : currentServers) {
- loadManagementPeons.computeIfAbsent(server.getName(), serverName -> {
- LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
- loadQueuePeon.start();
- log.debug("Created LoadQueuePeon for server[%s].", server.getName());
- return loadQueuePeon;
- });
- }
- }
-
- DruidCluster prepareCluster(
- CoordinatorDynamicConfig dynamicConfig,
- SegmentLoadingConfig segmentLoadingConfig,
- List<ImmutableDruidServer> currentServers
- )
- {
- final Set<String> decommissioningServers =
dynamicConfig.getDecommissioningNodes();
- final DruidCluster.Builder cluster = DruidCluster.builder();
- for (ImmutableDruidServer server : currentServers) {
- cluster.add(
- new ServerHolder(
- server,
- loadManagementPeons.get(server.getName()),
- decommissioningServers.contains(server.getHost()),
- segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
- segmentLoadingConfig.getMaxLifetimeInLoadQueue()
- )
- );
- }
- return cluster.build();
- }
-
- void stopPeonsForDisappearedServers(List<ImmutableDruidServer> servers)
- {
- final Set<String> disappeared =
Sets.newHashSet(loadManagementPeons.keySet());
- for (ImmutableDruidServer server : servers) {
- disappeared.remove(server.getName());
- }
- for (String name : disappeared) {
- log.debug("Removing listener for server[%s] which is no longer
there.", name);
- LoadQueuePeon peon = loadManagementPeons.remove(name);
- peon.stop();
- }
- }
- }
-
/**
* Updates replication status of all used segments. This duty must run after
* {@link RunRules} so that the number of required replicas for all segments
@@ -971,6 +763,23 @@ public class DruidCoordinator
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
segmentReplicationStatus = params.getSegmentReplicationStatus();
+
+ // Collect stats for unavailable and under-replicated segments
+ final CoordinatorRunStats stats = params.getCoordinatorStats();
+ getDatasourceToUnavailableSegmentCount().forEach(
+ (dataSource, numUnavailable) -> stats.add(
+ Stats.Segments.UNAVAILABLE,
+ RowKey.of(Dimension.DATASOURCE, dataSource),
+ numUnavailable
+ )
+ );
+ getTierToDatasourceToUnderReplicatedCount(false).forEach(
+ (tier, countsPerDatasource) -> countsPerDatasource.forEach(
+ (dataSource, underReplicatedCount) ->
+ stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED,
tier, dataSource, underReplicatedCount)
+ )
+ );
+
return params;
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 79173683fa..07a7bf1b2e 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -332,7 +332,7 @@ public class DruidCoordinatorRuntimeParams
return this;
}
- public Builder
withSnapshotOfDataSourcesWithAllUsedSegments(DataSourcesSnapshot snapshot)
+ public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot)
{
this.usedSegments =
createUsedSegmentsSet(snapshot.iterateAllUsedSegmentsInSnapshot());
this.dataSourcesSnapshot = snapshot;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
index d0d426f7b5..0dd05cb1b5 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator.balancer;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl =
CostBalancerStrategyFactory.class)
@JsonSubTypes(value = {
@@ -30,7 +33,45 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
@JsonSubTypes.Type(name = "diskNormalized", value =
DiskNormalizedCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "random", value =
RandomBalancerStrategyFactory.class)
})
-public interface BalancerStrategyFactory
+public abstract class BalancerStrategyFactory
{
- BalancerStrategy createBalancerStrategy(ListeningExecutorService exec);
+ private static final Logger log = new Logger(BalancerStrategyFactory.class);
+
+ public abstract BalancerStrategy createBalancerStrategy(int
numBalancerThreads);
+
+ private int cachedBalancerThreadNumber;
+ private ListeningExecutorService balancerExec;
+
+ public void stopExecutor()
+ {
+ if (balancerExec != null) {
+ balancerExec.shutdownNow();
+ balancerExec = null;
+ }
+ }
+
+ protected ListeningExecutorService getOrCreateBalancerExecutor(int
balancerComputeThreads)
+ {
+ if (balancerExec == null) {
+ balancerExec = createNewBalancerExecutor(balancerComputeThreads);
+ } else if (cachedBalancerThreadNumber != balancerComputeThreads) {
+ log.info(
+ "'balancerComputeThreads' has changed from [%d] to [%d].",
+ cachedBalancerThreadNumber, balancerComputeThreads
+ );
+ balancerExec.shutdownNow();
+ balancerExec = createNewBalancerExecutor(balancerComputeThreads);
+ }
+
+ return balancerExec;
+ }
+
+ private ListeningExecutorService createNewBalancerExecutor(int numThreads)
+ {
+ log.info("Creating new balancer executor with [%d] threads.", numThreads);
+ cachedBalancerThreadNumber = numThreads;
+ return MoreExecutors.listeningDecorator(
+ Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
+ );
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
index 89b7cf7546..8c78bff7b6 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
@@ -44,7 +44,7 @@ import java.util.concurrent.RejectedExecutionException;
* and will be removed in future releases.
*/
@Deprecated
-public class CachingCostBalancerStrategyFactory implements
BalancerStrategyFactory
+public class CachingCostBalancerStrategyFactory extends BalancerStrategyFactory
{
private static final EmittingLogger LOG = new
EmittingLogger(CachingCostBalancerStrategyFactory.class);
@@ -128,8 +128,9 @@ public class CachingCostBalancerStrategyFactory implements
BalancerStrategyFacto
}
@Override
- public BalancerStrategy createBalancerStrategy(final
ListeningExecutorService exec)
+ public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{
+ final ListeningExecutorService exec =
getOrCreateBalancerExecutor(numBalancerThreads);
LOG.warn(
"'cachingCost' balancer strategy has been deprecated as it can lead to"
+ " unbalanced clusters. Use 'cost' strategy instead."
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
index 10d5952390..6894123c57 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
@@ -19,13 +19,11 @@
package org.apache.druid.server.coordinator.balancer;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class CostBalancerStrategyFactory implements BalancerStrategyFactory
+public class CostBalancerStrategyFactory extends BalancerStrategyFactory
{
@Override
- public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+ public CostBalancerStrategy createBalancerStrategy(int numBalancerThreads)
{
- return new CostBalancerStrategy(exec);
+ return new
CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
index 7d2f0d96bc..d1881d477c 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
@@ -19,17 +19,16 @@
package org.apache.druid.server.coordinator.balancer;
-import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.common.logger.Logger;
-public class DisabledCachingCostBalancerStrategyFactory implements
BalancerStrategyFactory
+public class DisabledCachingCostBalancerStrategyFactory extends
BalancerStrategyFactory
{
private static final Logger log = new Logger(BalancerStrategyFactory.class);
@Override
- public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+ public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{
log.warn("Balancer strategy 'cachingCost' is disabled. Using 'cost'
strategy instead.");
- return new CostBalancerStrategy(exec);
+ return new
CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
index 9c404b97e5..3389f6732e 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
@@ -19,13 +19,11 @@
package org.apache.druid.server.coordinator.balancer;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class DiskNormalizedCostBalancerStrategyFactory implements
BalancerStrategyFactory
+public class DiskNormalizedCostBalancerStrategyFactory extends
BalancerStrategyFactory
{
@Override
- public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+ public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{
- return new DiskNormalizedCostBalancerStrategy(exec);
+ return new
DiskNormalizedCostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
index 2655df5338..6b97dee713 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
@@ -19,12 +19,10 @@
package org.apache.druid.server.coordinator.balancer;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
+public class RandomBalancerStrategyFactory extends BalancerStrategyFactory
{
@Override
- public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+ public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{
return new RandomBalancerStrategy();
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
index 8c5acaeebb..da2f1e1a04 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
@@ -20,18 +20,18 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCluster;
-import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
+import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
-import org.apache.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Set;
@@ -45,11 +45,11 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
{
private static final Logger log = new
Logger(CollectSegmentAndServerStats.class);
- private final DruidCoordinator coordinator;
+ private final LoadQueueTaskMaster taskMaster;
- public CollectSegmentAndServerStats(DruidCoordinator coordinator)
+ public CollectSegmentAndServerStats(LoadQueueTaskMaster taskMaster)
{
- this.coordinator = coordinator;
+ this.taskMaster = taskMaster;
}
@Override
@@ -57,25 +57,15 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
{
params.getDruidCluster().getHistoricals()
.forEach(this::logHistoricalTierStats);
- collectSegmentStats(params);
+ logServerDebuggingInfo(params.getDruidCluster());
+ collectLoadQueueStats(params.getCoordinatorStats());
return params;
}
- private void collectSegmentStats(DruidCoordinatorRuntimeParams params)
+ private void collectLoadQueueStats(CoordinatorRunStats stats)
{
- final CoordinatorRunStats stats = params.getCoordinatorStats();
-
- final DruidCluster cluster = params.getDruidCluster();
- cluster.getHistoricals().forEach((tier, historicals) -> {
- final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
- stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
- long totalCapacity =
historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum);
- stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
- });
-
- // Collect load queue stats
- coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> {
+ taskMaster.getAllPeons().forEach((serverName, queuePeon) -> {
final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey,
queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey,
queuePeon.getSegmentsToLoad().size());
@@ -86,33 +76,6 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
stats.add(stat, createRowKeyForServer(serverName,
key.getValues()), statValue)
);
});
-
- coordinator.getDatasourceToUnavailableSegmentCount().forEach(
- (dataSource, numUnavailable) -> stats.add(
- Stats.Segments.UNAVAILABLE,
- RowKey.of(Dimension.DATASOURCE, dataSource),
- numUnavailable
- )
- );
-
- coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach(
- (tier, countsPerDatasource) -> countsPerDatasource.forEach(
- (dataSource, underReplicatedCount) ->
- stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, tier,
dataSource, underReplicatedCount)
- )
- );
-
- // Collect total segment stats
- params.getUsedSegmentsTimelinesPerDataSource().forEach(
- (dataSource, timeline) -> {
- long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
-
.mapToLong(DataSegment::getSize).sum();
-
- RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
- stats.add(Stats.Segments.USED_BYTES, datasourceKey,
totalSizeOfUsedSegments);
- stats.add(Stats.Segments.USED, datasourceKey,
timeline.getNumObjects());
- }
- );
}
private RowKey createRowKeyForServer(String serverName, Map<Dimension,
String> dimensionValues)
@@ -151,4 +114,19 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
);
}
+ private void logServerDebuggingInfo(DruidCluster cluster)
+ {
+ if (log.isDebugEnabled()) {
+ log.debug("Servers");
+ for (ServerHolder serverHolder : cluster.getAllServers()) {
+ ImmutableDruidServer druidServer = serverHolder.getServer();
+ log.debug(" %s", druidServer);
+ log.debug(" -- DataSources");
+ for (ImmutableDruidDataSource druidDataSource :
druidServer.getDataSources()) {
+ log.debug(" %s", druidDataSource);
+ }
+ }
+ }
+ }
+
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
new file mode 100644
index 0000000000..197adce91e
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.client.ServerInventoryView;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
+import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * This duty does the following:
+ * <ul>
+ * <li>Creates an immutable {@link DruidCluster} consisting of {@link
ServerHolder}s
+ * which represent the current state of the servers in the cluster.</li>
+ * <li>Starts and stops load peons for new and disappeared servers
respectively.</li>
+ * <li>Cancels in-progress loads on all decommissioning servers. This is done
+ * here to ensure that under-replicated segments are assigned to active
servers
+ * in the {@link RunRules} duty after this.</li>
+ * <li>Initializes the {@link BalancerStrategy} for the run.</li>
+ * </ul>
+ */
+public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
+{
+ private static final Logger log = new
Logger(PrepareBalancerAndLoadQueues.class);
+
+ private final LoadQueueTaskMaster taskMaster;
+ private final SegmentLoadQueueManager loadQueueManager;
+ private final ServerInventoryView serverInventoryView;
+ private final BalancerStrategyFactory balancerStrategyFactory;
+
+ public PrepareBalancerAndLoadQueues(
+ LoadQueueTaskMaster taskMaster,
+ SegmentLoadQueueManager loadQueueManager,
+ BalancerStrategyFactory balancerStrategyFactory,
+ ServerInventoryView serverInventoryView
+ )
+ {
+ this.taskMaster = taskMaster;
+ this.loadQueueManager = loadQueueManager;
+ this.balancerStrategyFactory = balancerStrategyFactory;
+ this.serverInventoryView = serverInventoryView;
+ }
+
+ @Override
+ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ {
+ List<ImmutableDruidServer> currentServers = prepareCurrentServers();
+ taskMaster.resetPeonsForNewServers(currentServers);
+
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ final SegmentLoadingConfig segmentLoadingConfig =
params.getSegmentLoadingConfig();
+
+ final DruidCluster cluster = prepareCluster(dynamicConfig,
segmentLoadingConfig, currentServers);
+ cancelLoadsOnDecommissioningServers(cluster);
+
+ final CoordinatorRunStats stats = params.getCoordinatorStats();
+ collectHistoricalStats(cluster, stats);
+ collectUsedSegmentStats(params, stats);
+
+ int numBalancerThreads =
params.getCoordinatorDynamicConfig().getBalancerComputeThreads();
+ final BalancerStrategy balancerStrategy =
balancerStrategyFactory.createBalancerStrategy(numBalancerThreads);
+ log.info(
+ "Using balancer strategy [%s] with [%d] threads.",
+ balancerStrategy.getClass().getSimpleName(), numBalancerThreads
+ );
+
+ return params.buildFromExisting()
+ .withDruidCluster(cluster)
+ .withBalancerStrategy(balancerStrategy)
+ .withSegmentAssignerUsing(loadQueueManager)
+ .build();
+ }
+
+ /**
+ * Cancels all load/move operations on decommissioning servers. This should
+ * be done before initializing the SegmentReplicantLookup so that
+ * under-replicated segments can be assigned in the current run itself.
+ */
+ private void cancelLoadsOnDecommissioningServers(DruidCluster cluster)
+ {
+ final AtomicInteger cancelledCount = new AtomicInteger(0);
+ final List<ServerHolder> decommissioningServers
+ = cluster.getAllServers().stream()
+ .filter(ServerHolder::isDecommissioning)
+ .collect(Collectors.toList());
+
+ for (ServerHolder server : decommissioningServers) {
+ server.getQueuedSegments().forEach(
+ (segment, action) -> {
+ // Cancel the operation if it is a type of load
+ if (action.isLoad() && server.cancelOperation(action, segment)) {
+ cancelledCount.incrementAndGet();
+ }
+ }
+ );
+ }
+
+ if (cancelledCount.get() > 0) {
+ log.info(
+ "Cancelled [%d] load/move operations on [%d] decommissioning
servers.",
+ cancelledCount.get(), decommissioningServers.size()
+ );
+ }
+ }
+
+ private List<ImmutableDruidServer> prepareCurrentServers()
+ {
+ return serverInventoryView
+ .getInventory()
+ .stream()
+ .filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
+ .map(DruidServer::toImmutableDruidServer)
+ .collect(Collectors.toList());
+ }
+
+ private DruidCluster prepareCluster(
+ CoordinatorDynamicConfig dynamicConfig,
+ SegmentLoadingConfig segmentLoadingConfig,
+ List<ImmutableDruidServer> currentServers
+ )
+ {
+ final Set<String> decommissioningServers =
dynamicConfig.getDecommissioningNodes();
+ final DruidCluster.Builder cluster = DruidCluster.builder();
+ for (ImmutableDruidServer server : currentServers) {
+ cluster.add(
+ new ServerHolder(
+ server,
+ taskMaster.getPeonForServer(server),
+ decommissioningServers.contains(server.getHost()),
+ segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
+ segmentLoadingConfig.getMaxLifetimeInLoadQueue()
+ )
+ );
+ }
+ return cluster.build();
+ }
+
+ private void collectHistoricalStats(DruidCluster cluster,
CoordinatorRunStats stats)
+ {
+ cluster.getHistoricals().forEach((tier, historicals) -> {
+ RowKey rowKey = RowKey.of(Dimension.TIER, tier);
+ stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
+
+ long totalCapacity =
historicals.stream().mapToLong(ServerHolder::getMaxSize).sum();
+ stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
+ });
+ }
+
+ private void collectUsedSegmentStats(DruidCoordinatorRuntimeParams params,
CoordinatorRunStats stats)
+ {
+ params.getUsedSegmentsTimelinesPerDataSource().forEach((dataSource,
timeline) -> {
+ long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
+
.mapToLong(DataSegment::getSize).sum();
+
+ RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
+ stats.add(Stats.Segments.USED_BYTES, datasourceKey,
totalSizeOfUsedSegments);
+ stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects());
+ });
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
index bc68f87bc5..7a97a61fa3 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
@@ -20,22 +20,32 @@
package org.apache.druid.server.coordinator.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Provider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Provides LoadQueuePeons
*/
public class LoadQueueTaskMaster
{
+ private static final Logger log = new Logger(LoadQueueTaskMaster.class);
+
private final Provider<CuratorFramework> curatorFrameworkProvider;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService peonExec;
@@ -45,6 +55,11 @@ public class LoadQueueTaskMaster
private final ZkPathsConfig zkPaths;
private final boolean httpLoading;
+ @GuardedBy("this")
+ private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+ private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons =
new ConcurrentHashMap<>();
+
public LoadQueueTaskMaster(
Provider<CuratorFramework> curatorFrameworkProvider,
ObjectMapper jsonMapper,
@@ -65,7 +80,7 @@ public class LoadQueueTaskMaster
this.httpLoading = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
}
- public LoadQueuePeon giveMePeon(ImmutableDruidServer server)
+ private LoadQueuePeon createPeon(ImmutableDruidServer server)
{
if (httpLoading) {
return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient,
config, peonExec, callbackExec);
@@ -81,6 +96,69 @@ public class LoadQueueTaskMaster
}
}
+ public Map<String, LoadQueuePeon> getAllPeons()
+ {
+ return loadManagementPeons;
+ }
+
+ public LoadQueuePeon getPeonForServer(ImmutableDruidServer server)
+ {
+ return loadManagementPeons.get(server.getName());
+ }
+
+ /**
+ * Creates a peon for each of the given servers, if it doesn't already exist
and
+ * removes peons for servers not present in the cluster anymore.
+ * <p>
+ * This method must not run concurrently with {@link #onLeaderStart()} and
+ * {@link #onLeaderStop()} so that there are no stray peons if the
Coordinator
+ * is not leader anymore.
+ */
+ public synchronized void resetPeonsForNewServers(List<ImmutableDruidServer>
currentServers)
+ {
+ if (!isLeader.get()) {
+ return;
+ }
+
+ final Set<String> oldServers =
Sets.newHashSet(loadManagementPeons.keySet());
+
+ // Start peons for new servers
+ for (ImmutableDruidServer server : currentServers) {
+ loadManagementPeons.computeIfAbsent(server.getName(), serverName -> {
+ LoadQueuePeon loadQueuePeon = createPeon(server);
+ loadQueuePeon.start();
+ log.debug("Created LoadQueuePeon for server[%s].", server.getName());
+ return loadQueuePeon;
+ });
+ }
+
+ // Remove peons for disappeared servers
+ for (ImmutableDruidServer server : currentServers) {
+ oldServers.remove(server.getName());
+ }
+ for (String name : oldServers) {
+ log.debug("Removing LoadQueuePeon for disappeared server[%s].", name);
+ LoadQueuePeon peon = loadManagementPeons.remove(name);
+ peon.stop();
+ }
+ }
+
+ public synchronized void onLeaderStart()
+ {
+ isLeader.set(true);
+ }
+
+ /**
+ * Stops and removes all peons.
+ */
+ public synchronized void onLeaderStop()
+ {
+ isLeader.set(false);
+
+ loadManagementPeons.values().forEach(LoadQueuePeon::stop);
+ loadManagementPeons.clear();
+ }
+
public boolean isHttpLoading()
{
return httpLoading;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 249dea2ce9..d1e5b7c5b2 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
@@ -51,7 +50,6 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType;
import
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
-import
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
@@ -583,60 +581,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager);
}
- @Test
- public void testBalancerThreadNumber()
- {
- ScheduledExecutorFactory scheduledExecutorFactory =
EasyMock.createNiceMock(ScheduledExecutorFactory.class);
- EasyMock.replay(scheduledExecutorFactory);
-
- DruidCoordinator c = new DruidCoordinator(
- druidCoordinatorConfig,
- EasyMock.createNiceMock(JacksonConfigManager.class),
- null,
- null,
- null,
- null,
- scheduledExecutorFactory,
- null,
- loadQueueTaskMaster,
- null,
- null,
- null,
- null,
- null,
- new CoordinatorCustomDutyGroups(ImmutableSet.of()),
- new RandomBalancerStrategyFactory(),
- null,
- null,
- null
- );
-
- // before initialization
- Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
- Assert.assertNull(c.getBalancerExec());
-
- // first initialization
- c.createBalancerStrategy(5);
- Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
- ListeningExecutorService firstExec = c.getBalancerExec();
- Assert.assertNotNull(firstExec);
-
- // second initialization, expect no changes as cachedBalancerThreadNumber
is not changed
- c.createBalancerStrategy(5);
- Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
- ListeningExecutorService secondExec = c.getBalancerExec();
- Assert.assertNotNull(secondExec);
- Assert.assertSame(firstExec, secondExec);
-
- // third initialization, expect executor recreated as
cachedBalancerThreadNumber is changed to 10
- c.createBalancerStrategy(10);
- Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
- ListeningExecutorService thirdExec = c.getBalancerExec();
- Assert.assertNotNull(thirdExec);
- Assert.assertNotSame(secondExec, thirdExec);
- Assert.assertNotSame(firstExec, thirdExec);
- }
-
@Test
public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
{
@@ -767,6 +711,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
{
// Some nessesary setup to start the Coordinator
+ setupPeons(Collections.emptyMap());
JacksonConfigManager configManager =
EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
@@ -805,10 +750,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
EasyMock.expect(segmentsMetadataManager.iterateAllUsedSegments())
.andReturn(Collections.singletonList(dataSegment)).anyTimes();
- EasyMock.replay(segmentsMetadataManager);
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes();
- EasyMock.replay(serverInventoryView);
+ EasyMock.replay(serverInventoryView, loadQueueTaskMaster,
segmentsMetadataManager);
// Create CoordinatorCustomDutyGroups
// We will have two groups and each group has one duty
@@ -942,7 +886,16 @@ public class DruidCoordinatorTest extends CuratorTestBase
private void setupPeons(Map<String, LoadQueuePeon> peonMap)
{
-
EasyMock.expect(loadQueueTaskMaster.giveMePeon(EasyMock.anyObject())).andAnswer(
+ loadQueueTaskMaster.resetPeonsForNewServers(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ loadQueueTaskMaster.onLeaderStart();
+ EasyMock.expectLastCall().anyTimes();
+ loadQueueTaskMaster.onLeaderStop();
+ EasyMock.expectLastCall().anyTimes();
+
+
EasyMock.expect(loadQueueTaskMaster.getAllPeons()).andReturn(peonMap).anyTimes();
+
+
EasyMock.expect(loadQueueTaskMaster.getPeonForServer(EasyMock.anyObject())).andAnswer(
() -> peonMap.get(((ImmutableDruidServer)
EasyMock.getCurrentArgument(0)).getName())
).anyTimes();
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
index d08b8ff104..f8234b36da 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
@@ -22,42 +22,45 @@ package org.apache.druid.server.coordinator.balancer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
public class BalancerStrategyFactoryTest
{
private final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
- private ListeningExecutorService executorService;
-
- @Before
- public void setup()
- {
- executorService = MoreExecutors.listeningDecorator(
- new BlockingExecutorService("StrategyFactoryTest-%s")
- );
- }
-
- @After
- public void tearDown()
- {
- executorService.shutdownNow();
- }
-
@Test
public void testCachingCostStrategyFallsBackToCost() throws
JsonProcessingException
{
final String json = "{\"strategy\":\"cachingCost\"}";
BalancerStrategyFactory factory = MAPPER.readValue(json,
BalancerStrategyFactory.class);
- BalancerStrategy strategy =
factory.createBalancerStrategy(executorService);
+ BalancerStrategy strategy = factory.createBalancerStrategy(1);
Assert.assertTrue(strategy instanceof CostBalancerStrategy);
Assert.assertFalse(strategy instanceof CachingCostBalancerStrategy);
+
+ factory.stopExecutor();
+ }
+
+ @Test
+ public void testBalancerFactoryCreatesNewExecutorIfNumThreadsChanges()
+ {
+ BalancerStrategyFactory factory = new CostBalancerStrategyFactory();
+ ListeningExecutorService exec1 = factory.getOrCreateBalancerExecutor(1);
+ ListeningExecutorService exec2 = factory.getOrCreateBalancerExecutor(2);
+
+ Assert.assertTrue(exec1.isShutdown());
+ Assert.assertNotSame(exec1, exec2);
+
+ ListeningExecutorService exec3 = factory.getOrCreateBalancerExecutor(3);
+ Assert.assertTrue(exec2.isShutdown());
+ Assert.assertNotSame(exec2, exec3);
+
+ ListeningExecutorService exec4 = factory.getOrCreateBalancerExecutor(3);
+ Assert.assertFalse(exec3.isShutdown());
+ Assert.assertSame(exec3, exec4);
+
+ factory.stopExecutor();
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index aada7b3214..1f51565bcd 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -30,7 +30,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
-import
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -92,7 +92,7 @@ public class BalanceSegmentsTest
server4 = new DruidServer("server4", "server4", null, 100L,
ServerType.HISTORICAL, "normal", 0);
balancerStrategyExecutor =
MoreExecutors.listeningDecorator(Execs.multiThreaded(1,
"BalanceSegmentsTest-%d"));
- balancerStrategy = new
CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
+ balancerStrategy = new CostBalancerStrategy(balancerStrategyExecutor);
broadcastDatasources = Collections.singleton("datasourceBroadcast");
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
index 1090ef573e..9921281fde 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
@@ -19,14 +19,14 @@
package org.apache.druid.server.coordinator.duty;
-import it.unimi.dsi.fastutil.objects.Object2IntMaps;
-import it.unimi.dsi.fastutil.objects.Object2LongMaps;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.DruidCluster;
-import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
+import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.junit.Assert;
@@ -36,13 +36,11 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
-import java.util.Collections;
-
@RunWith(MockitoJUnitRunner.class)
public class CollectSegmentAndServerStatsTest
{
@Mock
- private DruidCoordinator mockDruidCoordinator;
+ private LoadQueueTaskMaster mockTaskMaster;
@Test
public void testCollectedSegmentStats()
@@ -55,17 +53,15 @@ public class CollectSegmentAndServerStatsTest
.withSegmentAssignerUsing(new
SegmentLoadQueueManager(null, null, null))
.build();
- Mockito.when(mockDruidCoordinator.getDatasourceToUnavailableSegmentCount())
- .thenReturn(Object2IntMaps.singleton("ds", 10));
-
Mockito.when(mockDruidCoordinator.getTierToDatasourceToUnderReplicatedCount(false))
- .thenReturn(Collections.singletonMap("ds",
Object2LongMaps.singleton("tier1", 100)));
+ Mockito.when(mockTaskMaster.getAllPeons())
+ .thenReturn(ImmutableMap.of("server1", new TestLoadQueuePeon()));
- CoordinatorDuty duty = new
CollectSegmentAndServerStats(mockDruidCoordinator);
+ CoordinatorDuty duty = new CollectSegmentAndServerStats(mockTaskMaster);
DruidCoordinatorRuntimeParams params = duty.run(runtimeParams);
CoordinatorRunStats stats = params.getCoordinatorStats();
- Assert.assertTrue(stats.hasStat(Stats.Segments.UNAVAILABLE));
- Assert.assertTrue(stats.hasStat(Stats.Segments.UNDER_REPLICATED));
+ Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_LOAD));
+ Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_DROP));
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 6884d25975..2926dbd6d7 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -1774,7 +1774,7 @@ public class CompactSegmentsTest
{
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
- .withSnapshotOfDataSourcesWithAllUsedSegments(dataSources)
+ .withDataSourcesSnapshot(dataSources)
.withCompactionConfig(
new CoordinatorCompactionConfig(
compactionConfigs,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
index ce89cd616b..acbf89e322 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
@@ -91,7 +91,7 @@ public class MarkOvershadowedSegmentsAsUnusedTest
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
- .withSnapshotOfDataSourcesWithAllUsedSegments(
+ .withDataSourcesSnapshot(
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
)
.withDruidCluster(druidCluster)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]