github-code-scanning[bot] commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1163048337
##########
server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java:
##########
@@ -117,397 +102,290 @@
@Before
public void setUp()
{
- EmittingLogger.registerEmitter(EMITTER);
- EMITTER.start();
- throttler = EasyMock.createMock(ReplicationThrottler.class);
-
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1,
"LoadRuleTest-%d"));
balancerStrategy = new
CostBalancerStrategyFactory().createBalancerStrategy(exec);
cachingCostBalancerStrategy = new
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
+ stateManager = new SegmentStateManager(null, null, null);
}
@After
- public void tearDown() throws Exception
+ public void tearDown()
{
exec.shutdown();
- EMITTER.close();
}
@Test
public void testLoad()
{
-
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
-
final LoadQueuePeon mockPeon = createEmptyPeon();
- mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
- LoadRule rule = createLoadRule(ImmutableMap.of(
- "hot", 1,
- DruidServer.DEFAULT_TIER, 2
- ));
-
- final DataSegment segment = createDataSegment("foo");
-
- throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER,
segment.getId(), "hostNorm");
- EasyMock.expectLastCall().once();
-
if (!useRoundRobinAssignment) {
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
- .times(3);
+ .times(2);
}
-
- EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
+ EasyMock.replay(mockPeon, mockBalancerStrategy);
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
- .addTier(
- "hot",
- new ServerHolder(
- new DruidServer("serverHot", "hostHot", null, 1000,
ServerType.HISTORICAL, "hot", 1)
- .toImmutableDruidServer(),
- mockPeon
- )
- )
- .addTier(
- DruidServer.DEFAULT_TIER,
- new ServerHolder(
- new DruidServer(
- "serverNorm",
- "hostNorm",
- null,
- 1000,
- ServerType.HISTORICAL,
- DruidServer.DEFAULT_TIER,
- 0
- ).toImmutableDruidServer(),
- mockPeon
- )
- )
+ .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
+ .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon, false))
.build();
- CoordinatorStats stats = rule.run(null,
makeCoordinatorRuntimeParams(druidCluster, segment), segment);
+ final DataSegment segment = createDataSegment(DS_WIKI);
+ LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 2));
+ CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
- Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT,
"hot"));
- Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT,
DruidServer.DEFAULT_TIER));
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
- EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
+ EasyMock.verify(mockPeon, mockBalancerStrategy);
+ }
+
+ private CoordinatorRunStats runRuleAndGetStats(
+ LoadRule rule,
+ DataSegment segment,
+ DruidCluster cluster
+ )
+ {
+ return runRuleAndGetStats(rule, segment,
makeCoordinatorRuntimeParams(cluster, segment));
+ }
+
+ private CoordinatorRunStats runRuleAndGetStats(
+ LoadRule rule,
+ DataSegment segment,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ ReplicationThrottler throttler = new ReplicationThrottler(
+ Sets.newHashSet(params.getDruidCluster().getTierNames()),
+ dynamicConfig.getReplicationThrottleLimit(),
+ dynamicConfig.getReplicantLifetime(),
+ dynamicConfig.getMaxNonPrimaryReplicantsToLoad()
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4775)
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java:
##########
@@ -163,9 +144,74 @@
.emit();
}
+ segmentLoader.makeAlerts();
+
+ final CoordinatorRunStats stats = segmentLoader.getStats();
+ stats.forEachRowKey(
+ (dimensionValues, statValues) ->
+ log.info("Stats for dimensions [%s]: [%s]", dimensionValues,
statValues)
+ );
+
return params.buildFromExisting()
.withCoordinatorStats(stats)
.withBroadcastDatasources(broadcastDatasources)
+ .withReplicationManager(replicationThrottler)
.build();
}
+
+ /**
+ * Reduces the lifetimes of segments currently being replicated in all the
tiers.
+ * Returns the set of tiers that are currently replicatinng some segments and
+ * won't be eligible for assigning more replicas in this run.
+ */
+ private Set<String> reduceLifetimesAndGetBusyTiers(CoordinatorDynamicConfig
dynamicConfig)
+ {
+ final Set<String> busyTiers = new HashSet<>();
+ stateManager.reduceLifetimesOfReplicatingSegments().forEach((tier,
replicatingState) -> {
+ int numReplicatingSegments = replicatingState.getNumProcessingSegments();
+ if (numReplicatingSegments <= 0) {
+ return;
+ }
+
+ busyTiers.add(tier);
+ log.info(
+ "Skipping replication on tier [%s] as is it still has"
+ + " [%d] segments in queue with lifetime [%d / %d].",
+ tier,
+ numReplicatingSegments,
+ replicatingState.getMinLifetime(),
+ dynamicConfig.getReplicantLifetime()
+ );
+
+ // Create alerts for stuck tiers
+ if (replicatingState.getMinLifetime() <= 0) {
+ log.makeAlert(
+ "Replication queue for tier [%s] has [%d] segments stuck.",
+ tier,
+ replicatingState.getNumExpiredSegments()
+ ).addData("segments", replicatingState.getExpiredSegments()).emit();
+ }
+ });
+
+ return busyTiers;
+ }
+
+ private ReplicationThrottler createReplicationThrottler(
+ Set<String> busyTiers,
+ DruidCluster cluster,
+ CoordinatorDynamicConfig dynamicConfig
+ )
+ {
+ // Tiers that already have some replication in progress are not eligible
for
+ // replication in this coordinator run
+ final Set<String> tiersEligibleForReplication =
Sets.newHashSet(cluster.getTierNames());
+ tiersEligibleForReplication.removeAll(busyTiers);
+
+ return new ReplicationThrottler(
+ tiersEligibleForReplication,
+ dynamicConfig.getReplicationThrottleLimit(),
+ dynamicConfig.getReplicantLifetime(),
+ dynamicConfig.getMaxNonPrimaryReplicantsToLoad()
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4771)
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
return this;
}
+ /**
+ * Builds a CoordinatoryDynamicConfig using either the configured values,
or
+ * the default value if not configured.
+ */
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT :
mergeBytesLimit,
- mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null ?
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
- :
percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
- replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME :
replicantLifetime,
- replicationThrottleLimit == null ?
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
- balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS :
balancerComputeThreads,
- emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS :
emitBalancingStats,
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+ Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+ ),
+ valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+ valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+ valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+ valueOrDefault(useBatchedSegmentSampler,
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+ valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+ valueOrDefault(replicationThrottleLimit,
Defaults.REPLICATION_THROTTLE_LIMIT),
+ valueOrDefault(balancerComputeThreads,
Defaults.BALANCER_COMPUTE_THREADS),
+ valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
specificDataSourcesToKillUnusedSegmentsIn,
dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
- : maxSegmentsInNodeLoadingQueue,
+ valueOrDefault(maxSegmentsInNodeLoadingQueue,
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null ?
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
- :
maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ decommissioningMaxPercentOfMaxSegmentsToMove,
+ Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+ ),
+ valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+ valueOrDefault(replicateAfterLoadTimeout,
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+ valueOrDefault(maxNonPrimaryReplicantsToLoad,
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+ valueOrDefault(useRoundRobinSegmentAssignment,
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
);
}
+ private static <T> T valueOrDefault(@Nullable T value, @NotNull T
defaultValue)
+ {
+ return value == null ? defaultValue : value;
+ }
+
public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? defaults.getMergeBytesLimit() :
mergeBytesLimit,
- mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null
- ? defaults.getPercentOfSegmentsToConsiderPerMove()
- : percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
- replicantLifetime == null ? defaults.getReplicantLifetime() :
replicantLifetime,
- replicationThrottleLimit == null ?
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
- balancerComputeThreads == null ?
defaults.getBalancerComputeThreads() : balancerComputeThreads,
- emitBalancingStats == null ? defaults.emitBalancingStats() :
emitBalancingStats,
- specificDataSourcesToKillUnusedSegmentsIn == null
- ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
- : specificDataSourcesToKillUnusedSegmentsIn,
- dataSourcesToNotKillStalePendingSegmentsIn == null
- ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
- : dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? defaults.getMaxSegmentsInNodeLoadingQueue()
- : maxSegmentsInNodeLoadingQueue,
- decommissioningNodes == null ? defaults.getDecommissioningNodes() :
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? defaults.getPauseCoordination() :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null
- ? defaults.getMaxNonPrimaryReplicantsToLoad()
- : maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+ ),
+ valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+ valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+ valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
defaults.getPercentOfSegmentsToConsiderPerMove()),
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.getPercentOfSegmentsToConsiderPerMove](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4766)
##########
server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java:
##########
@@ -497,4 +463,27 @@
.size(0)
.build();
}
+
+ private SegmentLoader createSegmentLoader(
+ SegmentStateManager stateManager,
+ DruidCoordinatorRuntimeParams params,
+ String... tiersEligibleForReplication
+ )
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ ReplicationThrottler throttler = new ReplicationThrottler(
+ new HashSet<>(Arrays.asList(tiersEligibleForReplication)),
+ dynamicConfig.getReplicationThrottleLimit(),
+ dynamicConfig.getReplicantLifetime(),
+ dynamicConfig.getMaxNonPrimaryReplicantsToLoad()
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4773)
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -20,292 +20,326 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.Lists;
-import org.apache.druid.client.ImmutableDruidServer;
-import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.BalancerStrategy;
-import org.apache.druid.server.coordinator.CoordinatorStats;
-import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.server.coordinator.LoadPeonCallback;
-import org.apache.druid.server.coordinator.LoadQueuePeon;
+import org.apache.druid.server.coordinator.SegmentLoader;
+import org.apache.druid.server.coordinator.SegmentStateManager;
import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
import java.util.SortedSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
+ *
*/
public class BalanceSegments implements CoordinatorDuty
{
protected static final EmittingLogger log = new
EmittingLogger(BalanceSegments.class);
+ private final SegmentStateManager stateManager;
- protected final DruidCoordinator coordinator;
-
- protected final Map<String, ConcurrentHashMap<SegmentId,
BalancerSegmentHolder>> currentlyMovingSegments =
- new HashMap<>();
-
- public BalanceSegments(DruidCoordinator coordinator)
+ public BalanceSegments(SegmentStateManager stateManager)
{
- this.coordinator = coordinator;
+ this.stateManager = stateManager;
}
- protected void reduceLifetimes(String tier)
+ /**
+ * Reduces the lifetimes of segments currently being moved in all the tiers.
+ * Raises alerts for segments stuck in the queue.
+ */
+ private void reduceLifetimesAndAlert()
{
- for (BalancerSegmentHolder holder :
currentlyMovingSegments.get(tier).values()) {
- holder.reduceLifetime();
- if (holder.getLifetime() <= 0) {
- log.makeAlert("[%s]: Balancer move segments queue has a segment
stuck", tier)
- .addData("segment", holder.getSegment().getId())
- .addData("server", holder.getFromServer().getMetadata())
- .emit();
+ stateManager.reduceLifetimesOfMovingSegments().forEach((tier, movingState)
-> {
+ int numMovingSegments = movingState.getNumProcessingSegments();
+ if (numMovingSegments <= 0) {
+ return;
}
- }
+
+ // Create alerts for stuck tiers
+ if (movingState.getMinLifetime() <= 0) {
+ log.makeAlert(
+ "Balancing queue for tier [%s] has [%d] segments stuck.",
+ tier,
+ movingState.getNumExpiredSegments()
+ ).addData("segments", movingState.getExpiredSegments()).emit();
+ }
+ });
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
- final CoordinatorStats stats = new CoordinatorStats();
- params.getDruidCluster().getHistoricals().forEach((String tier,
NavigableSet<ServerHolder> servers) -> {
- balanceTier(params, tier, servers, stats);
- });
+ reduceLifetimesAndAlert();
+
+ if (params.getUsedSegments().isEmpty()) {
+ log.info("Skipping balance as there are no used segments.");
+ return params;
+ }
+
+ final DruidCluster cluster = params.getDruidCluster();
+ final int maxSegmentsToMove =
params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
+ if (maxSegmentsToMove <= 0) {
+ log.info("Skipping balance as maxSegmentsToMove is [%d].",
maxSegmentsToMove);
+ return params;
+ } else {
+ int maxLifetime =
params.getCoordinatorDynamicConfig().getReplicantLifetime();
+ log.info(
+ "Balancing segments in tiers [%s] with maxSegmentsToMove=[%d],
maxLifetime=[%d].",
+ cluster.getTierNames(), maxSegmentsToMove, maxLifetime
+ );
+ }
+
+ final CoordinatorRunStats stats = new CoordinatorRunStats();
+ final SegmentLoader loader = new SegmentLoader(
+ stateManager,
+ params.getDruidCluster(),
+ params.getSegmentReplicantLookup(),
+ params.getReplicationManager(),
+ params.getBalancerStrategy(),
+ params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()
+ );
+
+ cluster.getHistoricals().forEach(
+ (tier, servers) -> balanceTier(params, tier, servers, stats, loader)
+ );
+
+ loader.makeAlerts();
+ stats.accumulate(loader.getStats());
+
+ stats.forEachRowKey(
+ (dimensionValues, statValues) ->
+ log.info("Stats for dimensions [%s] are [%s]", dimensionValues,
statValues)
+ );
+
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
+ /**
+ * Moves as many segments as possible from decommissioning to active servers,
+ * then moves segments amongst active servers.
+ */
private void balanceTier(
DruidCoordinatorRuntimeParams params,
String tier,
SortedSet<ServerHolder> servers,
- CoordinatorStats stats
+ CoordinatorRunStats stats,
+ SegmentLoader loader
)
{
-
- log.info("Balancing segments in tier [%s]", tier);
- if (params.getUsedSegments().size() == 0) {
- log.info("Metadata segments are not available. Cannot balance.");
- // suppress emit zero stats
- return;
- }
- currentlyMovingSegments.computeIfAbsent(tier, t -> new
ConcurrentHashMap<>());
-
- if (!currentlyMovingSegments.get(tier).isEmpty()) {
- reduceLifetimes(tier);
- log.info(
- "[%s]: Still waiting on %,d segments to be moved. Skipping balance.",
- tier,
- currentlyMovingSegments.get(tier).size()
- );
- // suppress emit zero stats
- return;
- }
-
- /*
- Take as many segments from decommissioning servers as
decommissioningMaxPercentOfMaxSegmentsToMove allows and find
- the best location for them on active servers. After that, balance
segments within active servers pool.
- */
Map<Boolean, List<ServerHolder>> partitions =
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
final List<ServerHolder> decommissioningServers = partitions.get(true);
final List<ServerHolder> activeServers = partitions.get(false);
+
log.info(
- "Found %d active servers, %d decommissioning servers",
- activeServers.size(),
- decommissioningServers.size()
+ "Balancing segments in tier [%s] with [%d] activeServers and [%d]
decommissioning servers.",
+ tier, activeServers.size(), decommissioningServers.size()
);
if ((decommissioningServers.isEmpty() && activeServers.size() <= 1) ||
activeServers.isEmpty()) {
- log.warn("[%s]: insufficient active servers. Cannot balance.", tier);
- // suppress emit zero stats
+ log.warn("Skipping balance for tier [%s] as there are [%d] active
servers.", tier, activeServers.size());
return;
}
- int numSegments = 0;
- for (ServerHolder sourceHolder : servers) {
- numSegments += sourceHolder.getServer().getNumSegments();
- }
-
- if (numSegments == 0) {
- log.info("No segments found. Cannot balance.");
- // suppress emit zero stats
+ final int maxSegmentsToMove =
params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
+ if (maxSegmentsToMove <= 0) {
return;
}
- final int maxSegmentsToMove =
Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(),
numSegments);
+ // Move segments from decommissioning to active servers
+ int maxDecommPercentToMove = params.getCoordinatorDynamicConfig()
+
.getDecommissioningMaxPercentOfMaxSegmentsToMove();
Review Comment:
## Deprecated method or constructor invocation
Invoking
[CoordinatorDynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4770)
##########
server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java:
##########
@@ -626,189 +422,115 @@
.withBroadcastDatasources(broadcastDatasources)
.build();
- params = new BalanceSegmentsTester(coordinator).run(params);
- Assert.assertEquals(2L,
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+ CoordinatorRunStats stats = runBalancer(params);
+ Assert.assertEquals(2L, stats.getTieredStat(Stats.Segments.MOVED,
"normal"));
}
- private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
- List<ImmutableDruidServer> druidServers,
- List<LoadQueuePeon> peons
- )
+ private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params)
{
- return defaultRuntimeParamsBuilder(
- druidServers,
- peons,
- druidServers.stream().map(s -> false).collect(Collectors.toList())
- );
+ params = new BalanceSegments(stateManager).run(params);
+ if (params == null) {
+ Assert.fail("BalanceSegments duty returned null params");
+ return new CoordinatorRunStats();
+ } else {
+ return params.getCoordinatorStats();
+ }
}
private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
- List<ImmutableDruidServer> druidServers,
- List<LoadQueuePeon> peons,
- List<Boolean> decommissioning
+ ServerHolder... servers
)
{
return CoordinatorRuntimeParamsTestHelpers
.newBuilder()
- .withDruidCluster(
- DruidClusterBuilder
- .newBuilder()
- .addTier(
- "normal",
- IntStream
- .range(0, druidServers.size())
- .mapToObj(i -> new ServerHolder(druidServers.get(i),
peons.get(i), decommissioning.get(i)))
- .toArray(ServerHolder[]::new)
- )
- .build()
- )
- .withLoadManagementPeons(
- IntStream
- .range(0, peons.size())
- .boxed()
- .collect(Collectors.toMap(i -> String.valueOf(i + 1),
peons::get))
- )
- .withUsedSegmentsInTest(segments)
-
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+ .withDruidCluster(DruidClusterBuilder.newBuilder().addTier("normal",
servers).build())
+ .withUsedSegmentsInTest(allSegments)
.withBroadcastDatasources(broadcastDatasources)
- .withBalancerStrategy(balancerStrategy);
+ .withBalancerStrategy(balancerStrategy)
+ .withReplicationManager(createReplicationThrottler());
}
- private static void mockDruidServer(
- ImmutableDruidServer druidServer,
- String name,
- String tier,
- long currentSize,
- long maxSize,
- List<DataSegment> segments
- )
+ private ServerHolder createHolder(DruidServer server, DataSegment...
loadedSegments)
{
- EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes();
- EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
-
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
- EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
- ImmutableDruidServerTests.expectSegments(druidServer, segments);
- EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes();
-
EasyMock.expect(druidServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes();
- if (!segments.isEmpty()) {
- segments.forEach(
- s ->
EasyMock.expect(druidServer.getSegment(s.getId())).andReturn(s).anyTimes()
- );
- }
-
EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
- EasyMock.replay(druidServer);
+ return createHolder(server, false, loadedSegments);
}
- private static void mockCoordinator(DruidCoordinator coordinator)
+ private ServerHolder createHolder(DruidServer server, boolean
isDecommissioning, DataSegment... loadedSegments)
{
- coordinator.moveSegment(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject()
- );
- EasyMock.expectLastCall().anyTimes();
- EasyMock.replay(coordinator);
+ return createHolder(server, 0, isDecommissioning, loadedSegments);
}
- private static class PredefinedPickOrderBalancerStrategy implements
BalancerStrategy
+ private ServerHolder createHolder(
+ DruidServer server,
+ int maxSegmentsInLoadQueue,
+ boolean isDecommissioning,
+ DataSegment... loadedSegments
+ )
{
- private final BalancerStrategy delegate;
- private final List<BalancerSegmentHolder> pickOrder;
- private final AtomicInteger pickCounter = new AtomicInteger(0);
-
- PredefinedPickOrderBalancerStrategy(
- BalancerStrategy delegate,
- List<BalancerSegmentHolder> pickOrder
- )
- {
- this.delegate = delegate;
- this.pickOrder = pickOrder;
- }
-
- @Override
- public ServerHolder findNewSegmentHomeBalancer(DataSegment
proposalSegment, List<ServerHolder> serverHolders)
- {
- return delegate.findNewSegmentHomeBalancer(proposalSegment,
serverHolders);
- }
-
- @Override
- public ServerHolder findNewSegmentHomeReplicator(DataSegment
proposalSegment, List<ServerHolder> serverHolders)
- {
- return delegate.findNewSegmentHomeReplicator(proposalSegment,
serverHolders);
- }
-
- @Override
- public Iterator<BalancerSegmentHolder> pickSegmentsToMove(
- List<ServerHolder> serverHolders,
- Set<String> broadcastDatasources,
- int numberOfSegments
- )
- {
- return pickOrder.iterator();
- }
-
- @Override
- public Iterator<BalancerSegmentHolder> pickSegmentsToMove(
- List<ServerHolder> serverHolders,
- Set<String> broadcastDatasources,
- double percentOfSegmentsToConsider
- )
- {
- return pickOrder.iterator();
+ for (DataSegment segment : loadedSegments) {
+ server.addDataSegment(segment);
}
- @Override
- public void emitStats(String tier, CoordinatorStats stats,
List<ServerHolder> serverHolderList)
- {
- delegate.emitStats(tier, stats, serverHolderList);
- }
+ return new ServerHolder(
+ server.toImmutableDruidServer(),
+ new LoadQueuePeonTester(),
+ isDecommissioning,
+ maxSegmentsInLoadQueue
+ );
}
- private DruidCoordinatorRuntimeParams
setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(int percent)
+ private IExpectationSetters<Iterator<BalancerSegmentHolder>>
expectPickLoadingSegmentsAndReturnEmpty(
+ BalancerStrategy strategy
+ )
{
- mockDruidServer(druidServer1, "1", "normal", 30L, 100L,
Arrays.asList(segment1, segment3));
- mockDruidServer(druidServer2, "2", "normal", 30L, 100L,
Arrays.asList(segment2, segment3));
- mockDruidServer(druidServer3, "3", "normal", 0L, 100L,
Collections.emptyList());
-
- EasyMock.replay(druidServer4);
-
- mockCoordinator(coordinator);
+ return EasyMock.expect(
+ strategy.pickSegmentsToMove(
+ EasyMock.anyObject(),
+ EasyMock.anyObject(),
+ EasyMock.anyInt(),
+ EasyMock.eq(true)
+ )
+ ).andReturn(Collections.emptyIterator());
+ }
- // either decommissioning servers list or acitve ones (ie servers list is
[2] or [1, 3])
- BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(
+ private IExpectationSetters<Iterator<BalancerSegmentHolder>>
expectPickLoadedSegmentsAndReturn(
+ BalancerStrategy strategy,
+ BalancerSegmentHolder... pickedLoadedSegments
+ )
+ {
+ return EasyMock.expect(
strategy.pickSegmentsToMove(
- ImmutableList.of(
- new ServerHolder(druidServer2, peon2, true)
- ),
- broadcastDatasources,
- 100.0
+ EasyMock.anyObject(),
+ EasyMock.anyObject(),
+ EasyMock.anyInt(),
+ EasyMock.eq(false)
)
- ).andReturn(
- ImmutableList.of(new BalancerSegmentHolder(druidServer2,
segment2)).iterator()
+ ).andReturn(Arrays.asList(pickedLoadedSegments).iterator());
+ }
+
+ private ReplicationThrottler createReplicationThrottler()
+ {
+ CoordinatorDynamicConfig dynamicConfig =
CoordinatorDynamicConfig.builder().build();
+ return new ReplicationThrottler(
+ Collections.singleton("normal"),
+ dynamicConfig.getReplicationThrottleLimit(),
+ dynamicConfig.getReplicantLifetime(),
+ dynamicConfig.getMaxNonPrimaryReplicantsToLoad()
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4772)
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
return this;
}
+ /**
+ * Builds a CoordinatoryDynamicConfig using either the configured values,
or
+ * the default value if not configured.
+ */
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT :
mergeBytesLimit,
- mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null ?
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
- :
percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
- replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME :
replicantLifetime,
- replicationThrottleLimit == null ?
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
- balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS :
balancerComputeThreads,
- emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS :
emitBalancingStats,
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+ Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+ ),
+ valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+ valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+ valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+ valueOrDefault(useBatchedSegmentSampler,
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+ valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+ valueOrDefault(replicationThrottleLimit,
Defaults.REPLICATION_THROTTLE_LIMIT),
+ valueOrDefault(balancerComputeThreads,
Defaults.BALANCER_COMPUTE_THREADS),
+ valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
specificDataSourcesToKillUnusedSegmentsIn,
dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
- : maxSegmentsInNodeLoadingQueue,
+ valueOrDefault(maxSegmentsInNodeLoadingQueue,
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null ?
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
- :
maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ decommissioningMaxPercentOfMaxSegmentsToMove,
+ Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+ ),
+ valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+ valueOrDefault(replicateAfterLoadTimeout,
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+ valueOrDefault(maxNonPrimaryReplicantsToLoad,
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+ valueOrDefault(useRoundRobinSegmentAssignment,
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
);
}
+ private static <T> T valueOrDefault(@Nullable T value, @NotNull T
defaultValue)
+ {
+ return value == null ? defaultValue : value;
+ }
+
public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? defaults.getMergeBytesLimit() :
mergeBytesLimit,
- mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null
- ? defaults.getPercentOfSegmentsToConsiderPerMove()
- : percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
- replicantLifetime == null ? defaults.getReplicantLifetime() :
replicantLifetime,
- replicationThrottleLimit == null ?
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
- balancerComputeThreads == null ?
defaults.getBalancerComputeThreads() : balancerComputeThreads,
- emitBalancingStats == null ? defaults.emitBalancingStats() :
emitBalancingStats,
- specificDataSourcesToKillUnusedSegmentsIn == null
- ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
- : specificDataSourcesToKillUnusedSegmentsIn,
- dataSourcesToNotKillStalePendingSegmentsIn == null
- ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
- : dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? defaults.getMaxSegmentsInNodeLoadingQueue()
- : maxSegmentsInNodeLoadingQueue,
- decommissioningNodes == null ? defaults.getDecommissioningNodes() :
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? defaults.getPauseCoordination() :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null
- ? defaults.getMaxNonPrimaryReplicantsToLoad()
- : maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+ ),
+ valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+ valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+ valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
defaults.getPercentOfSegmentsToConsiderPerMove()),
+ valueOrDefault(useBatchedSegmentSampler,
defaults.useBatchedSegmentSampler()),
+ valueOrDefault(replicantLifetime, defaults.getReplicantLifetime()),
+ valueOrDefault(replicationThrottleLimit,
defaults.getReplicationThrottleLimit()),
+ valueOrDefault(balancerComputeThreads,
defaults.getBalancerComputeThreads()),
+ valueOrDefault(emitBalancingStats, defaults.emitBalancingStats()),
+ valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn,
defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()),
+ valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn,
defaults.getDataSourcesToNotKillStalePendingSegmentsIn()),
+ valueOrDefault(maxSegmentsInNodeLoadingQueue,
defaults.getMaxSegmentsInNodeLoadingQueue()),
+ valueOrDefault(decommissioningNodes,
defaults.getDecommissioningNodes()),
+ valueOrDefault(
+ decommissioningMaxPercentOfMaxSegmentsToMove,
+ defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
+ ),
+ valueOrDefault(pauseCoordination, defaults.getPauseCoordination()),
+ valueOrDefault(replicateAfterLoadTimeout,
defaults.getReplicateAfterLoadTimeout()),
+ valueOrDefault(maxNonPrimaryReplicantsToLoad,
defaults.getMaxNonPrimaryReplicantsToLoad()),
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4769)
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
return this;
}
+ /**
+ * Builds a CoordinatoryDynamicConfig using either the configured values,
or
+ * the default value if not configured.
+ */
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT :
mergeBytesLimit,
- mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null ?
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
- :
percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
- replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME :
replicantLifetime,
- replicationThrottleLimit == null ?
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
- balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS :
balancerComputeThreads,
- emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS :
emitBalancingStats,
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+ Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+ ),
+ valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+ valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+ valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+ valueOrDefault(useBatchedSegmentSampler,
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+ valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+ valueOrDefault(replicationThrottleLimit,
Defaults.REPLICATION_THROTTLE_LIMIT),
+ valueOrDefault(balancerComputeThreads,
Defaults.BALANCER_COMPUTE_THREADS),
+ valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
specificDataSourcesToKillUnusedSegmentsIn,
dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
- : maxSegmentsInNodeLoadingQueue,
+ valueOrDefault(maxSegmentsInNodeLoadingQueue,
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null ?
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
- :
maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ decommissioningMaxPercentOfMaxSegmentsToMove,
+ Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+ ),
+ valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+ valueOrDefault(replicateAfterLoadTimeout,
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+ valueOrDefault(maxNonPrimaryReplicantsToLoad,
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+ valueOrDefault(useRoundRobinSegmentAssignment,
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
);
}
+ private static <T> T valueOrDefault(@Nullable T value, @NotNull T
defaultValue)
+ {
+ return value == null ? defaultValue : value;
+ }
+
public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? defaults.getMergeBytesLimit() :
mergeBytesLimit,
- mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null
- ? defaults.getPercentOfSegmentsToConsiderPerMove()
- : percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
- replicantLifetime == null ? defaults.getReplicantLifetime() :
replicantLifetime,
- replicationThrottleLimit == null ?
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
- balancerComputeThreads == null ?
defaults.getBalancerComputeThreads() : balancerComputeThreads,
- emitBalancingStats == null ? defaults.emitBalancingStats() :
emitBalancingStats,
- specificDataSourcesToKillUnusedSegmentsIn == null
- ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
- : specificDataSourcesToKillUnusedSegmentsIn,
- dataSourcesToNotKillStalePendingSegmentsIn == null
- ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
- : dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? defaults.getMaxSegmentsInNodeLoadingQueue()
- : maxSegmentsInNodeLoadingQueue,
- decommissioningNodes == null ? defaults.getDecommissioningNodes() :
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? defaults.getPauseCoordination() :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null
- ? defaults.getMaxNonPrimaryReplicantsToLoad()
- : maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+ ),
+ valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+ valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+ valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
defaults.getPercentOfSegmentsToConsiderPerMove()),
+ valueOrDefault(useBatchedSegmentSampler,
defaults.useBatchedSegmentSampler()),
+ valueOrDefault(replicantLifetime, defaults.getReplicantLifetime()),
+ valueOrDefault(replicationThrottleLimit,
defaults.getReplicationThrottleLimit()),
+ valueOrDefault(balancerComputeThreads,
defaults.getBalancerComputeThreads()),
+ valueOrDefault(emitBalancingStats, defaults.emitBalancingStats()),
+ valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn,
defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()),
+ valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn,
defaults.getDataSourcesToNotKillStalePendingSegmentsIn()),
+ valueOrDefault(maxSegmentsInNodeLoadingQueue,
defaults.getMaxSegmentsInNodeLoadingQueue()),
+ valueOrDefault(decommissioningNodes,
defaults.getDecommissioningNodes()),
+ valueOrDefault(
+ decommissioningMaxPercentOfMaxSegmentsToMove,
+ defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
Review Comment:
## Deprecated method or constructor invocation
Invoking
[CoordinatorDynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4768)
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
return this;
}
+ /**
+ * Builds a CoordinatoryDynamicConfig using either the configured values,
or
+ * the default value if not configured.
+ */
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT :
mergeBytesLimit,
- mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null ?
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
- :
percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
- replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME :
replicantLifetime,
- replicationThrottleLimit == null ?
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
- balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS :
balancerComputeThreads,
- emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS :
emitBalancingStats,
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+ Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+ ),
+ valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+ valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+ valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+ valueOrDefault(useBatchedSegmentSampler,
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+ valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+ valueOrDefault(replicationThrottleLimit,
Defaults.REPLICATION_THROTTLE_LIMIT),
+ valueOrDefault(balancerComputeThreads,
Defaults.BALANCER_COMPUTE_THREADS),
+ valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
specificDataSourcesToKillUnusedSegmentsIn,
dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
- : maxSegmentsInNodeLoadingQueue,
+ valueOrDefault(maxSegmentsInNodeLoadingQueue,
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null ?
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
- :
maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ decommissioningMaxPercentOfMaxSegmentsToMove,
+ Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+ ),
+ valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+ valueOrDefault(replicateAfterLoadTimeout,
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+ valueOrDefault(maxNonPrimaryReplicantsToLoad,
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+ valueOrDefault(useRoundRobinSegmentAssignment,
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
);
}
+ private static <T> T valueOrDefault(@Nullable T value, @NotNull T
defaultValue)
+ {
+ return value == null ? defaultValue : value;
+ }
+
public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
{
return new CoordinatorDynamicConfig(
- leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
- ?
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
- : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- mergeBytesLimit == null ? defaults.getMergeBytesLimit() :
mergeBytesLimit,
- mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() :
mergeSegmentsLimit,
- maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null
- ? defaults.getPercentOfSegmentsToConsiderPerMove()
- : percentOfSegmentsToConsiderPerMove,
- useBatchedSegmentSampler == null ?
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
- replicantLifetime == null ? defaults.getReplicantLifetime() :
replicantLifetime,
- replicationThrottleLimit == null ?
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
- balancerComputeThreads == null ?
defaults.getBalancerComputeThreads() : balancerComputeThreads,
- emitBalancingStats == null ? defaults.emitBalancingStats() :
emitBalancingStats,
- specificDataSourcesToKillUnusedSegmentsIn == null
- ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
- : specificDataSourcesToKillUnusedSegmentsIn,
- dataSourcesToNotKillStalePendingSegmentsIn == null
- ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
- : dataSourcesToNotKillStalePendingSegmentsIn,
- maxSegmentsInNodeLoadingQueue == null
- ? defaults.getMaxSegmentsInNodeLoadingQueue()
- : maxSegmentsInNodeLoadingQueue,
- decommissioningNodes == null ? defaults.getDecommissioningNodes() :
decommissioningNodes,
- decommissioningMaxPercentOfMaxSegmentsToMove == null
- ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
- : decommissioningMaxPercentOfMaxSegmentsToMove,
- pauseCoordination == null ? defaults.getPauseCoordination() :
pauseCoordination,
- replicateAfterLoadTimeout == null ?
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null
- ? defaults.getMaxNonPrimaryReplicantsToLoad()
- : maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ?
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+ valueOrDefault(
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+ ),
+ valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+ valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+ valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+ valueOrDefault(percentOfSegmentsToConsiderPerMove,
defaults.getPercentOfSegmentsToConsiderPerMove()),
+ valueOrDefault(useBatchedSegmentSampler,
defaults.useBatchedSegmentSampler()),
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.useBatchedSegmentSampler](1) should be
avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4767)
##########
server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java:
##########
@@ -402,18 +409,40 @@
largeSegments.get(2),
largeSegments2.get(0),
largeSegments2.get(1)
- ),
- smallSegment
+ )
);
- Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
- Assert.assertFalse(stats.hasPerTierStats());
-
+ Assert.assertEquals(5L,
stats.getDataSourceStat(Stats.Segments.ASSIGNED_BROADCAST, DS_SMALL));
Assert.assertTrue(
druidCluster
.getAllServers()
.stream()
.allMatch(holder -> holder.isLoadingSegment(smallSegment) ||
holder.isServingSegment(smallSegment))
);
}
+
+ private CoordinatorRunStats runRuleAndGetStats(
+ Rule rule,
+ DataSegment segment,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ ReplicationThrottler throttler = new ReplicationThrottler(
+ Sets.newHashSet(params.getDruidCluster().getTierNames()),
+ dynamicConfig.getReplicationThrottleLimit(),
+ dynamicConfig.getReplicantLifetime(),
+ dynamicConfig.getMaxNonPrimaryReplicantsToLoad()
Review Comment:
## Deprecated method or constructor invocation
Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1)
should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4774)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]