This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 133054bf27 Make batched segment sampling the default, minor cleanup of
coordinator config (#13391)
133054bf27 is described below
commit 133054bf27dd680fb281e440a7a9123d6e7818c5
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Nov 21 20:31:46 2022 +0530
Make batched segment sampling the default, minor cleanup of coordinator
config (#13391)
The batch segment sampling performs significantly better than the older
method
of sampling if there are a large number of used segments. It also avoids
duplicates.
Changes:
- Make batch segment sampling the default
- Deprecate the property `useBatchedSegmentSampler`
- Remove unused coordinator config
`druid.coordinator.loadqueuepeon.repeatDelay`
- Cleanup `KillUnusedSegments`
- Simplify `KillUnusedSegmentsTest`, add better tests, remove redundant
tests
---
.../druid/server/coordinator/BalancerStrategy.java | 8 +-
.../coordinator/CoordinatorDynamicConfig.java | 16 +-
.../server/coordinator/DruidCoordinatorConfig.java | 6 -
.../coordinator/duty/KillUnusedSegments.java | 103 +++--
.../server/coordinator/BalanceSegmentsTest.java | 15 +-
.../coordinator/CuratorDruidCoordinatorTest.java | 1 -
.../coordinator/DruidCoordinatorConfigTest.java | 2 -
.../server/coordinator/DruidCoordinatorTest.java | 2 -
.../server/coordinator/HttpLoadQueuePeonTest.java | 1 -
.../server/coordinator/LoadQueuePeonTest.java | 3 -
.../server/coordinator/LoadQueuePeonTester.java | 1 -
.../coordinator/ReservoirSegmentSamplerTest.java | 347 +++++++++--------
.../coordinator/TestDruidCoordinatorConfig.java | 18 -
.../coordinator/duty/KillUnusedSegmentsTest.java | 414 +++++++++------------
.../simulate/CoordinatorSimulationBuilder.java | 1 -
.../server/http/CoordinatorDynamicConfigTest.java | 24 +-
.../coordinator-dynamic-config.tsx | 2 +-
17 files changed, 439 insertions(+), 525 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
index 6478614ba5..4a48137a88 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
@@ -105,9 +105,7 @@ public interface BalancerStrategy
}
/**
- * Pick the best segments to move from one of the supplied set of servers
according to the balancing strategy. This
- * is the deprecated way of picking a segment to move.
pickSegmentsToMove(List<ServerHoler>, Set<String>, int) uses
- * a more performant bathced sampling method that will become the default
picking mode in the future.
+ * Pick the best segments to move from one of the supplied set of servers
according to the balancing strategy.
*
* @param serverHolders set of historicals to consider for moving segments
* @param broadcastDatasources Datasources that contain segments which were
loaded via broadcast rules.
@@ -122,6 +120,10 @@ public interface BalancerStrategy
* for implementations of this method.
* @return Iterator for set of {@link BalancerSegmentHolder} containing
segment to move and server they currently
* reside on, or empty if there are no segments to pick from (i. e. all
provided serverHolders are empty).
+ *
+ * @deprecated Use {@link #pickSegmentsToMove(List, Set, int)} instead as it
is
+ * a much more performant sampling method which does not allow duplicates.
This
+ * method will be removed in future releases.
*/
@Deprecated
default Iterator<BalancerSegmentHolder> pickSegmentsToMove(
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 475a8f88cd..b9f0d490a3 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -56,6 +56,7 @@ public class CoordinatorDynamicConfig
private final int maxSegmentsToMove;
@Deprecated
private final double percentOfSegmentsToConsiderPerMove;
+ @Deprecated
private final boolean useBatchedSegmentSampler;
private final int replicantLifetime;
private final int replicationThrottleLimit;
@@ -115,7 +116,7 @@ public class CoordinatorDynamicConfig
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
@Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove")
@Nullable Double percentOfSegmentsToConsiderPerMove,
- @JsonProperty("useBatchedSegmentSampler") boolean
useBatchedSegmentSampler,
+ @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean
useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") int replicantLifetime,
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
@@ -161,7 +162,12 @@ public class CoordinatorDynamicConfig
);
this.percentOfSegmentsToConsiderPerMove =
percentOfSegmentsToConsiderPerMove;
- this.useBatchedSegmentSampler = useBatchedSegmentSampler;
+ if (useBatchedSegmentSampler == null) {
+ this.useBatchedSegmentSampler =
Builder.DEFAULT_USE_BATCHED_SEGMENT_SAMPLER;
+ } else {
+ this.useBatchedSegmentSampler = useBatchedSegmentSampler;
+ }
+
this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
@@ -276,6 +282,7 @@ public class CoordinatorDynamicConfig
return percentOfSegmentsToConsiderPerMove;
}
+ @Deprecated
@JsonProperty
public boolean useBatchedSegmentSampler()
{
@@ -517,7 +524,7 @@ public class CoordinatorDynamicConfig
private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
- private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
+ private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = true;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
private static final int
DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
@@ -557,7 +564,7 @@ public class CoordinatorDynamicConfig
@JsonProperty("mergeSegmentsLimit") @Nullable Integer
mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
@Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove")
@Nullable Double percentOfSegmentsToConsiderPerMove,
- @JsonProperty("useBatchedSegmentSampler") Boolean
useBatchedSegmentSampler,
+ @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean
useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
@JsonProperty("replicationThrottleLimit") @Nullable Integer
replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") @Nullable Integer
balancerComputeThreads,
@@ -627,6 +634,7 @@ public class CoordinatorDynamicConfig
return this;
}
+ @Deprecated
public Builder withUseBatchedSegmentSampler(boolean
useBatchedSegmentSampler)
{
this.useBatchedSegmentSampler = useBatchedSegmentSampler;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index cdd708ea9d..9495f30f2c 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -101,12 +101,6 @@ public abstract class DruidCoordinatorConfig
return new Duration(15 * 60 * 1000);
}
- @Config("druid.coordinator.loadqueuepeon.repeatDelay")
- public Duration getLoadQueuePeonRepeatDelay()
- {
- return Duration.millis(50);
- }
-
@Config("druid.coordinator.loadqueuepeon.type")
public String getLoadQueuePeonType()
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 2ab3762ba9..f1659b3335 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.duty;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -33,7 +32,6 @@ import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
-import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
@@ -43,7 +41,7 @@ import java.util.List;
* negative meaning the interval end target will be in the future. Also,
retainDuration can be ignored,
* meaning that there is no upper bound to the end interval of segments that
will be killed. This action is called
* "to kill a segment".
- *
+ * <p>
* See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask.
*/
public class KillUnusedSegments implements CoordinatorDuty
@@ -102,74 +100,69 @@ public class KillUnusedSegments implements CoordinatorDuty
Collection<String> dataSourcesToKill =
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
+ // If no datasource has been specified, all are eligible for killing
unused segments
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
}
- if (dataSourcesToKill != null &&
- dataSourcesToKill.size() > 0 &&
- (lastKillTime + period) < System.currentTimeMillis()) {
- lastKillTime = System.currentTimeMillis();
-
- for (String dataSource : dataSourcesToKill) {
- final Interval intervalToKill = findIntervalForKill(dataSource,
maxSegmentsToKill);
- if (intervalToKill != null) {
- try {
- indexingServiceClient.killUnusedSegments("coordinator-issued",
dataSource, intervalToKill);
- }
- catch (Exception ex) {
- log.error(ex, "Failed to submit kill task for dataSource [%s]",
dataSource);
- if (Thread.currentThread().isInterrupted()) {
- log.warn("skipping kill task scheduling because thread is
interrupted.");
- break;
- }
- }
- }
- }
+ final long currentTimeMillis = System.currentTimeMillis();
+ if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+ log.debug("No eligible datasource to kill unused segments.");
+ } else if (lastKillTime + period > currentTimeMillis) {
+ log.debug("Skipping kill of unused segments as kill period has not
elapsed yet.");
+ } else {
+ log.debug("Killing unused segments in datasources: %s",
dataSourcesToKill);
+ lastKillTime = currentTimeMillis;
+ killUnusedSegments(dataSourcesToKill);
}
+
return params;
}
- /**
- * For a given datasource and limit of segments that can be killed in one
task, determine the interval to be
- * submitted with the kill task.
- *
- * @param dataSource dataSource whose unused segments are being killed.
- * @param limit the maximum number of segments that can be included in the
kill task.
- * @return {@link Interval} to be used in the kill task.
- */
- @VisibleForTesting
- @Nullable
- Interval findIntervalForKill(String dataSource, int limit)
+ private void killUnusedSegments(Collection<String> dataSourcesToKill)
{
- List<Interval> unusedSegmentIntervals =
- segmentsMetadataManager.getUnusedSegmentIntervals(dataSource,
getEndTimeUpperLimit(), limit);
+ int submittedTasks = 0;
+ for (String dataSource : dataSourcesToKill) {
+ final Interval intervalToKill = findIntervalForKill(dataSource);
+ if (intervalToKill == null) {
+ continue;
+ }
- if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
- return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
- } else {
- return null;
+ try {
+ indexingServiceClient.killUnusedSegments("coordinator-issued",
dataSource, intervalToKill);
+ ++submittedTasks;
+ }
+ catch (Exception ex) {
+ log.error(ex, "Failed to submit kill task for dataSource [%s]",
dataSource);
+ if (Thread.currentThread().isInterrupted()) {
+ log.warn("skipping kill task scheduling because thread is
interrupted.");
+ break;
+ }
+ }
}
+
+ log.debug("Submitted kill tasks for [%d] datasources.", submittedTasks);
}
/**
- * Calculate the {@link DateTime} that wil form the upper bound when looking
for segments that are
- * eligible to be killed. If ignoreDurationToRetain is true, we have no
upper bound and return a DateTime object
- * for "max" time that works when comparing date strings.
- *
- * @return {@link DateTime} representing the upper bound time used when
looking for segments to kill.
+ * Calculates the interval for which segments are to be killed in a
datasource.
*/
- @VisibleForTesting
- DateTime getEndTimeUpperLimit()
+ private Interval findIntervalForKill(String dataSource)
{
- return ignoreRetainDuration
- ? DateTimes.COMPARE_DATE_AS_STRING_MAX
- : DateTimes.nowUtc().minus(retainDuration);
- }
+ final DateTime maxEndTime = ignoreRetainDuration
+ ? DateTimes.COMPARE_DATE_AS_STRING_MAX
+ : DateTimes.nowUtc().minus(retainDuration);
- @VisibleForTesting
- Long getRetainDuration()
- {
- return retainDuration;
+ List<Interval> unusedSegmentIntervals = segmentsMetadataManager
+ .getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill);
+
+ if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
+ return null;
+ } else if (unusedSegmentIntervals.size() == 1) {
+ return unusedSegmentIntervals.get(0);
+ } else {
+ return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
+ }
}
+
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
index a7c594fe09..e2bb7a816b 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
@@ -238,7 +238,7 @@ public class BalanceSegmentsTest
new ServerHolder(druidServer2, peon2, true)
),
broadcastDatasources,
- 100.0
+ 2
)
).andReturn(
ImmutableList.of(
@@ -247,7 +247,7 @@ public class BalanceSegmentsTest
).iterator()
);
- EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyDouble()))
+ EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
@@ -297,7 +297,7 @@ public class BalanceSegmentsTest
DruidCoordinatorRuntimeParams params =
setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10);
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L,
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
- Assert.assertEquals(ImmutableSet.of(segment2), peon3.getSegmentsToLoad());
+ Assert.assertEquals(ImmutableSet.of(segment1), peon3.getSegmentsToLoad());
}
/**
@@ -316,7 +316,7 @@ public class BalanceSegmentsTest
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(
- strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyDouble()))
+ strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment2),
@@ -367,7 +367,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyDouble()))
+ EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new
BalancerSegmentHolder(druidServer1, segment1)).iterator())
.anyTimes();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(),
EasyMock.anyObject())).andAnswer(() -> {
@@ -403,7 +403,7 @@ public class BalanceSegmentsTest
ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyDouble()))
+ EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new
BalancerSegmentHolder(druidServer1, segment1)).iterator())
.once();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(),
EasyMock.anyObject()))
@@ -592,6 +592,7 @@ public class BalanceSegmentsTest
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(1)
+ .withUseBatchedSegmentSampler(false)
.withPercentOfSegmentsToConsiderPerMove(40)
.build()
)
@@ -788,7 +789,7 @@ public class BalanceSegmentsTest
).andReturn(
ImmutableList.of(new BalancerSegmentHolder(druidServer2,
segment2)).iterator()
);
- EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyDouble()))
+ EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new
BalancerSegmentHolder(druidServer1, segment1)).iterator());
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(),
EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 926bccc53e..4538bc867b 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -164,7 +164,6 @@ public class CuratorDruidCoordinatorTest extends
CuratorTestBase
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillMaxSegments(10)
- .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
sourceLoadQueueChildrenCache = new PathChildrenCache(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index cfbe2b7de9..db264e55e0 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -46,7 +46,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(7776000000L,
config.getCoordinatorKillDurationToRetain().getMillis());
Assert.assertEquals(100, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000),
config.getLoadTimeoutDelay());
- Assert.assertEquals(Duration.millis(50),
config.getLoadQueuePeonRepeatDelay());
Assert.assertTrue(config.getCompactionSkipLockedIntervals());
Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
Assert.assertEquals("http", config.getLoadQueuePeonType());
@@ -76,7 +75,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(new Duration("PT1s"),
config.getCoordinatorKillDurationToRetain());
Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
- Assert.assertEquals(Duration.millis(100),
config.getLoadQueuePeonRepeatDelay());
Assert.assertFalse(config.getCompactionSkipLockedIntervals());
Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 980c336aa2..a0502d9544 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -148,7 +148,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
.withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
- .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
pathChildrenCache = new PathChildrenCache(
@@ -833,7 +832,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillMaxSegments(10)
- .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
.withCompactionSkippedLockedIntervals(false)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index a1a1f06e05..aed032b196 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -75,7 +75,6 @@ public class HttpLoadQueuePeonTest
final TestDruidCoordinatorConfig config = new
TestDruidCoordinatorConfig.Builder()
.withCoordinatorKillMaxSegments(10)
- .withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(false)
.withHttpLoadQueuePeonBatchSize(2)
.build();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 6410f9e5ef..3929aa64d5 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -90,7 +90,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
Execs.singleThreaded("test_load_queue_peon-%d"),
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorKillMaxSegments(10)
- .withLoadQueuePeonRepeatDelay(Duration.millis(0))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
@@ -281,7 +280,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
new TestDruidCoordinatorConfig.Builder()
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillMaxSegments(10)
- .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
@@ -322,7 +320,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
new TestDruidCoordinatorConfig.Builder()
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillMaxSegments(10)
- .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index 9abcb105c3..a7b92eaf8d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -40,7 +40,6 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
new TestDruidCoordinatorConfig.Builder()
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillMaxSegments(10)
- .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
index 70086abd77..4181ba2bbe 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
@@ -19,173 +19,65 @@
package org.apache.druid.server.coordinator;
-import com.google.common.collect.Lists;
-import org.apache.druid.client.ImmutableDruidServer;
-import org.apache.druid.client.ImmutableDruidServerTests;
-import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class ReservoirSegmentSamplerTest
{
- private ImmutableDruidServer druidServer1;
- private ImmutableDruidServer druidServer2;
- private ImmutableDruidServer druidServer3;
- private ImmutableDruidServer druidServer4;
-
- private ServerHolder holder1;
- private ServerHolder holder2;
- private ServerHolder holder3;
- private ServerHolder holder4;
-
- private DataSegment segment1;
- private DataSegment segment2;
- private DataSegment segment3;
- private DataSegment segment4;
- List<DataSegment> segments1;
- List<DataSegment> segments2;
- List<DataSegment> segments3;
- List<DataSegment> segments4;
- List<DataSegment> segments;
+
+ /**
+ * num segments = 10 x 100 days
+ */
+ private final List<DataSegment> segments =
+ CreateDataSegments.ofDatasource("wiki")
+ .forIntervals(100, Granularities.DAY)
+ .startingAt("2022-01-01")
+ .withNumPartitions(10)
+ .eachOfSizeInMb(100);
@Before
public void setUp()
{
- druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
- druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
- druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
- druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
- holder1 = EasyMock.createMock(ServerHolder.class);
- holder2 = EasyMock.createMock(ServerHolder.class);
- holder3 = EasyMock.createMock(ServerHolder.class);
- holder4 = EasyMock.createMock(ServerHolder.class);
- segment1 = EasyMock.createMock(DataSegment.class);
- segment2 = EasyMock.createMock(DataSegment.class);
- segment3 = EasyMock.createMock(DataSegment.class);
- segment4 = EasyMock.createMock(DataSegment.class);
-
- DateTime start1 = DateTimes.of("2012-01-01");
- DateTime start2 = DateTimes.of("2012-02-01");
- DateTime version = DateTimes.of("2012-03-01");
- segment1 = new DataSegment(
- "datasource1",
- new Interval(start1, start1.plusHours(1)),
- version.toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 11L
- );
- segment2 = new DataSegment(
- "datasource1",
- new Interval(start2, start2.plusHours(1)),
- version.toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 7L
- );
- segment3 = new DataSegment(
- "datasource2",
- new Interval(start1, start1.plusHours(1)),
- version.toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 4L
- );
- segment4 = new DataSegment(
- "datasource2",
- new Interval(start2, start2.plusHours(1)),
- version.toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 8L
- );
-
- segments = Lists.newArrayList(segment1, segment2, segment3, segment4);
-
- segments1 = Collections.singletonList(segment1);
- segments2 = Collections.singletonList(segment2);
- segments3 = Collections.singletonList(segment3);
- segments4 = Collections.singletonList(segment4);
}
- //checks if every segment is selected at least once out of 5000 trials
+ //checks if every segment is selected at least once out of 50 trials
@Test
- public void getRandomBalancerSegmentHolderTest()
+ public void testEverySegmentGetsPickedAtleastOnce()
{
- int iterations = 5000;
-
-
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
- ImmutableDruidServerTests.expectSegments(druidServer1, segments1);
- EasyMock.replay(druidServer1);
-
-
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
- ImmutableDruidServerTests.expectSegments(druidServer2, segments2);
- EasyMock.replay(druidServer2);
-
-
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
- ImmutableDruidServerTests.expectSegments(druidServer3, segments3);
- EasyMock.replay(druidServer3);
-
-
EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
- ImmutableDruidServerTests.expectSegments(druidServer4, segments4);
- EasyMock.replay(druidServer4);
-
- // Have to use anyTimes() because the number of times a segment on a given
server is chosen is indetermistic.
- EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
- EasyMock.replay(holder1);
- EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
- EasyMock.replay(holder2);
- EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
- EasyMock.replay(holder3);
- EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes();
- EasyMock.replay(holder4);
-
- List<ServerHolder> holderList = new ArrayList<>();
- holderList.add(holder1);
- holderList.add(holder2);
- holderList.add(holder3);
- holderList.add(holder4);
+ int iterations = 50;
+ final List<ServerHolder> servers = Arrays.asList(
+ createHistorical("server1", segments.get(0)),
+ createHistorical("server2", segments.get(1)),
+ createHistorical("server3", segments.get(2)),
+ createHistorical("server4", segments.get(3))
+ );
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
for (int i = 0; i < iterations; i++) {
// due to the pseudo-randomness of this method, we may not select a
segment every single time no matter what.
- segmentCountMap.put(
- ReservoirSegmentSampler.getRandomBalancerSegmentHolders(holderList,
Collections.emptySet(), 1).get(0).getSegment(),
- 1
+ segmentCountMap.compute(
+ ReservoirSegmentSampler
+ .getRandomBalancerSegmentHolders(servers,
Collections.emptySet(), 1)
+ .get(0).getSegment(),
+ (segment, count) -> count == null ? 1 : count + 1
);
}
- for (DataSegment segment : segments) {
- Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
- }
-
- EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);
- EasyMock.verify(holder1, holder2, holder3, holder4);
+ // Verify that each segment has been chosen at least once
+ Assert.assertEquals(4, segmentCountMap.size());
}
/**
@@ -195,56 +87,149 @@ public class ReservoirSegmentSamplerTest
@Test
public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit()
{
- int iterations = 5000;
-
-
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
- ImmutableDruidServerTests.expectSegments(druidServer1, segments1);
- EasyMock.replay(druidServer1);
-
-
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
- ImmutableDruidServerTests.expectSegments(druidServer2, segments2);
- EasyMock.replay(druidServer2);
-
-
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
- ImmutableDruidServerTests.expectSegments(druidServer3, segments3);
- EasyMock.replay(druidServer3);
-
- ImmutableDruidServerTests.expectSegments(druidServer4, segments4);
- EasyMock.replay(druidServer4);
-
- // Have to use anyTimes() because the number of times a segment on a given
server is chosen is indetermistic.
- EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
- EasyMock.replay(holder1);
- EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
- EasyMock.replay(holder2);
- EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
- EasyMock.replay(holder3);
- // We only run getServer() each time we calculate the limit on segments to
consider. Always 5k
- EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000);
- EasyMock.replay(holder4);
-
- List<ServerHolder> holderList = new ArrayList<>();
- holderList.add(holder1);
- holderList.add(holder2);
- holderList.add(holder3);
- holderList.add(holder4);
+ int iterations = 50;
+
+ final DataSegment excludedSegment = segments.get(3);
+ final List<ServerHolder> servers = Arrays.asList(
+ createHistorical("server1", segments.get(0)),
+ createHistorical("server2", segments.get(1)),
+ createHistorical("server3", segments.get(2)),
+ createHistorical("server4", excludedSegment)
+ );
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
+
+ final double percentOfSegmentsToConsider = 75.0;
for (int i = 0; i < iterations; i++) {
- segmentCountMap.put(
- ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList,
Collections.emptySet(), 75).getSegment(), 1
+ segmentCountMap.compute(
+ ReservoirSegmentSampler
+ .getRandomBalancerSegmentHolder(servers, Collections.emptySet(),
percentOfSegmentsToConsider)
+ .getSegment(),
+ (segment, count) -> count == null ? 1 : count + 1
);
}
- for (DataSegment segment : segments) {
- if (!segment.equals(segment4)) {
- Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
- } else {
- Assert.assertNull(segmentCountMap.get(segment));
+ // Verify that the segment on server4 is never chosen because of limit
+ Assert.assertFalse(segmentCountMap.containsKey(excludedSegment));
+ Assert.assertEquals(3, segmentCountMap.size());
+ }
+
+ @Test
+ public void testSegmentsOnBrokersAreIgnored()
+ {
+ final ServerHolder historical = createHistorical("hist1", segments.get(0),
segments.get(1));
+
+ final ServerHolder broker = new ServerHolder(
+ new DruidServer("broker1", "broker1", null, 1000, ServerType.BROKER,
null, 1)
+ .addDataSegment(segments.get(2))
+ .addDataSegment(segments.get(3))
+ .toImmutableDruidServer(),
+ new LoadQueuePeonTester()
+ );
+
+ // Try to pick all the segments on the servers
+ List<BalancerSegmentHolder> pickedSegments =
ReservoirSegmentSampler.getRandomBalancerSegmentHolders(
+ Arrays.asList(historical, broker),
+ Collections.emptySet(),
+ 10
+ );
+
+ // Verify that only the segments on the historical are picked
+ Assert.assertEquals(2, pickedSegments.size());
+ for (BalancerSegmentHolder holder : pickedSegments) {
+ Assert.assertEquals(historical.getServer(), holder.getFromServer());
+ }
+ }
+
+ @Test
+ public void testBroadcastSegmentsAreIgnored()
+ {
+ // num segments = 1 x 4 days
+ final String broadcastDatasource = "ds_broadcast";
+ final List<DataSegment> broadcastSegments
+ = CreateDataSegments.ofDatasource(broadcastDatasource)
+ .forIntervals(4, Granularities.DAY)
+ .startingAt("2022-01-01")
+ .withNumPartitions(1)
+ .eachOfSizeInMb(100);
+
+ final List<ServerHolder> servers = Arrays.asList(
+ createHistorical("server1", broadcastSegments.toArray(new
DataSegment[0])),
+ createHistorical("server2", segments.get(0), segments.get(1))
+ );
+
+ // Try to pick all the segments on the servers
+ List<BalancerSegmentHolder> pickedSegments = ReservoirSegmentSampler
+ .getRandomBalancerSegmentHolders(servers,
Collections.singleton(broadcastDatasource), 10);
+
+ // Verify that none of the broadcast segments are picked
+ Assert.assertEquals(2, pickedSegments.size());
+ for (BalancerSegmentHolder holder : pickedSegments) {
+ Assert.assertNotEquals(broadcastDatasource,
holder.getSegment().getDataSource());
+ }
+ }
+
+ @Test(timeout = 60_000)
+ public void testNumberOfIterationsToCycleThroughAllSegments()
+ {
+ // The number of runs required for each sample percentage
+ // remains more or less fixed, even with a larger number of segments
+ final int[] samplePercentages = {100, 50, 10, 5, 1};
+ final int[] expectedIterations = {1, 20, 100, 200, 1000};
+
+ final int[] totalObservedIterations = new int[5];
+ for (int i = 0; i < 50; ++i) {
+ for (int j = 0; j < samplePercentages.length; ++j) {
+ totalObservedIterations[j] +=
countMinRunsWithSamplePercent(samplePercentages[j]);
+ }
+ }
+
+ for (int j = 0; j < samplePercentages.length; ++j) {
+ double avgObservedIterations = totalObservedIterations[j] / 50.0;
+ Assert.assertTrue(avgObservedIterations <= expectedIterations[j]);
+ }
+
+ }
+
+ /**
+ * Returns the minimum number of iterations of the reservoir sampling
required
+ * to pick each segment atleast once.
+ * <p>
+ * {@code k = sampleSize = totalNumSegments * samplePercentage}
+ */
+ private int countMinRunsWithSamplePercent(int samplePercentage)
+ {
+ final int numSegments = segments.size();
+ final List<ServerHolder> servers = Arrays.asList(
+ createHistorical("server1", segments.subList(0, numSegments /
2).toArray(new DataSegment[0])),
+ createHistorical("server2", segments.subList(numSegments / 2,
numSegments).toArray(new DataSegment[0]))
+ );
+
+ final Set<DataSegment> pickedSegments = new HashSet<>();
+
+ int sampleSize = (int) (numSegments * samplePercentage / 100.0);
+
+ int numIterations = 1;
+ for (; numIterations < 10000; ++numIterations) {
+ ReservoirSegmentSampler
+ .getRandomBalancerSegmentHolders(servers, Collections.emptySet(),
sampleSize)
+ .forEach(holder -> pickedSegments.add(holder.getSegment()));
+
+ if (pickedSegments.size() >= numSegments) {
+ break;
}
}
- EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);
- EasyMock.verify(holder1, holder2, holder3, holder4);
+ return numIterations;
+ }
+
+ private ServerHolder createHistorical(String serverName, DataSegment...
loadedSegments)
+ {
+ final DruidServer server =
+ new DruidServer(serverName, serverName, null, 100000,
ServerType.HISTORICAL, "normal", 1);
+ for (DataSegment segment : loadedSegments) {
+ server.addDataSegment(segment);
+ }
+ return new ServerHolder(server.toImmutableDruidServer(), new
LoadQueuePeonTester());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index 8153dd7942..271bda1131 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -39,7 +39,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
private final Duration coordinatorRuleKillDurationToRetain;
private final Duration coordinatorDatasourceKillPeriod;
private final Duration coordinatorDatasourceKillDurationToRetain;
- private final Duration loadQueuePeonRepeatDelay;
private final int coordinatorKillMaxSegments;
private final boolean compactionSkipLockedIntervals;
private final boolean coordinatorKillIgnoreDurationToRetain;
@@ -67,7 +66,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
Duration coordinatorDatasourceKillPeriod,
Duration coordinatorDatasourceKillDurationToRetain,
int coordinatorKillMaxSegments,
- Duration loadQueuePeonRepeatDelay,
boolean compactionSkipLockedIntervals,
boolean coordinatorKillIgnoreDurationToRetain,
String loadQueuePeonType,
@@ -94,7 +92,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
this.coordinatorDatasourceKillDurationToRetain =
coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
- this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
this.coordinatorKillIgnoreDurationToRetain =
coordinatorKillIgnoreDurationToRetain;
this.loadQueuePeonType = loadQueuePeonType;
@@ -206,12 +203,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
return loadTimeoutDelay == null ? super.getLoadTimeoutDelay() :
loadTimeoutDelay;
}
- @Override
- public Duration getLoadQueuePeonRepeatDelay()
- {
- return loadQueuePeonRepeatDelay;
- }
-
@Override
public boolean getCompactionSkipLockedIntervals()
{
@@ -272,7 +263,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD =
new Duration("PT86400s");
private static final Duration
DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN = new
Duration("PT7776000s");
private static final Duration DEFAULT_LOAD_TIMEOUT_DELAY = new Duration(15
* 60 * 1000);
- private static final Duration DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY =
Duration.millis(50);
private static final String DEFAULT_LOAD_QUEUE_PEON_TYPE = "curator";
private static final int
DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS = 2;
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY =
Duration.millis(60000);
@@ -299,7 +289,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
private Duration coordinatorDatasourceKillPeriod;
private Duration coordinatorDatasourceKillDurationToRetain;
private Duration loadTimeoutDelay;
- private Duration loadQueuePeonRepeatDelay;
private String loadQueuePeonType;
private Duration httpLoadQueuePeonRepeatDelay;
private Integer curatorLoadQueuePeonNumCallbackThreads;
@@ -409,12 +398,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
return this;
}
- public Builder withLoadQueuePeonRepeatDelay(Duration
loadQueuePeonRepeatDelay)
- {
- this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
- return this;
- }
-
public Builder withLoadQueuePeonType(String loadQueuePeonType)
{
this.loadQueuePeonType = loadQueuePeonType;
@@ -483,7 +466,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
coordinatorDatasourceKillPeriod == null ?
DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD : coordinatorDatasourceKillPeriod,
coordinatorDatasourceKillDurationToRetain == null ?
DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN :
coordinatorDatasourceKillDurationToRetain,
coordinatorKillMaxSegments == null ?
DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS : coordinatorKillMaxSegments,
- loadQueuePeonRepeatDelay == null ?
DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY : loadQueuePeonRepeatDelay,
compactionSkippedLockedIntervals == null ?
DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS : compactionSkippedLockedIntervals,
coordinatorKillIgnoreDurationToRetain == null ?
DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN :
coordinatorKillIgnoreDurationToRetain,
loadQueuePeonType == null ? DEFAULT_LOAD_QUEUE_PEON_TYPE :
loadQueuePeonType,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 48d730cd7c..ed7fea5aaf 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -20,281 +20,221 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
-import org.easymock.EasyMock;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
-import org.junit.Assert;
+import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.mockito.Answers;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import java.util.Set;
+import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
/**
+ *
*/
-@RunWith(Enclosed.class)
+@RunWith(MockitoJUnitRunner.class)
public class KillUnusedSegmentsTest
{
- /**
- * Standing up new tests with mocks was easier than trying to move the
existing tests to use mocks for consistency.
- * In the future, if all tests are moved to use the same structure, this
inner static class can be gotten rid of.
- */
- @RunWith(MockitoJUnitRunner.class)
- public static class MockedTest
+ private static final int MAX_SEGMENTS_TO_KILL = 10;
+ private static final Duration COORDINATOR_KILL_PERIOD =
Duration.standardMinutes(2);
+ private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
+ private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
+
+ @Mock
+ private SegmentsMetadataManager segmentsMetadataManager;
+ @Mock
+ private IndexingServiceClient indexingServiceClient;
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private DruidCoordinatorConfig config;
+
+ @Mock
+ private DruidCoordinatorRuntimeParams params;
+ @Mock
+ private CoordinatorDynamicConfig coordinatorDynamicConfig;
+
+ private DataSegment yearOldSegment;
+ private DataSegment monthOldSegment;
+ private DataSegment dayOldSegment;
+ private DataSegment hourOldSegment;
+ private DataSegment nextDaySegment;
+ private DataSegment nextMonthSegment;
+
+ private KillUnusedSegments target;
+
+ @Before
+ public void setup()
{
- private static final Set<String> ALL_DATASOURCES = ImmutableSet.of("DS1",
"DS2", "DS3");
- private static final int MAX_SEGMENTS_TO_KILL = 10;
- private static final Duration COORDINATOR_KILL_PERIOD =
Duration.standardMinutes(2);
- private static final Duration DURATION_TO_RETAIN =
Duration.standardDays(1);
- private static final Duration INDEXING_PERIOD =
Duration.standardMinutes(1);
-
- @Mock
- private SegmentsMetadataManager segmentsMetadataManager;
- @Mock
- private IndexingServiceClient indexingServiceClient;
- @Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private DruidCoordinatorConfig config;
-
- @Mock
- private DruidCoordinatorRuntimeParams params;
- @Mock
- private CoordinatorDynamicConfig coordinatorDynamicConfig;
- private KillUnusedSegments target;
-
- @Before
- public void setup()
- {
-
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-
Mockito.doReturn(ALL_DATASOURCES).when(segmentsMetadataManager).retrieveAllDataSourceNames();
-
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
- target = new KillUnusedSegments(segmentsMetadataManager,
indexingServiceClient, config);
- }
- @Test
- public void testRunWihNoIntervalShouldNotKillAnySegments()
- {
- target.run(params);
- Mockito.verify(indexingServiceClient, Mockito.never())
- .killUnusedSegments(anyString(), anyString(),
any(Interval.class));
- }
-
- @Test
- public void
testRunWihSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
- {
-
Mockito.when(coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()).thenReturn(Collections.singleton("DS1"));
- target.run(params);
- Mockito.verify(indexingServiceClient, Mockito.never())
- .killUnusedSegments(anyString(), anyString(),
any(Interval.class));
- }
+
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
+
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
+
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
+
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
+
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
+
+ Mockito.doReturn(Collections.singleton("DS1"))
+
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+
+ final DateTime now = DateTimes.nowUtc();
+
+ yearOldSegment = createSegmentWithEnd(now.minusDays(365));
+ monthOldSegment = createSegmentWithEnd(now.minusDays(30));
+ dayOldSegment = createSegmentWithEnd(now.minusDays(1));
+ hourOldSegment = createSegmentWithEnd(now.minusHours(1));
+ nextDaySegment = createSegmentWithEnd(now.plusDays(1));
+ nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
+
+ final List<DataSegment> unusedSegments = ImmutableList.of(
+ yearOldSegment,
+ monthOldSegment,
+ dayOldSegment,
+ hourOldSegment,
+ nextDaySegment,
+ nextMonthSegment
+ );
+
+ Mockito.when(
+ segmentsMetadataManager.getUnusedSegmentIntervals(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt()
+ )
+ ).thenAnswer(invocation -> {
+ DateTime maxEndTime = invocation.getArgument(1);
+ List<Interval> unusedIntervals =
+ unusedSegments.stream()
+ .map(DataSegment::getInterval)
+ .filter(i -> i.getEnd().isBefore(maxEndTime))
+ .collect(Collectors.toList());
+
+ int limit = invocation.getArgument(2);
+ return unusedIntervals.size() <= limit ? unusedIntervals :
unusedIntervals.subList(0, limit);
+ });
+
+ target = new KillUnusedSegments(segmentsMetadataManager,
indexingServiceClient, config);
}
- public static class FindIntervalsTest
+ @Test
+ public void testRunWithNoIntervalShouldNotKillAnySegments()
{
- @Test
- public void testFindIntervalForKill()
- {
- testFindIntervalForKill(null, null);
- testFindIntervalForKill(ImmutableList.of(), null);
-
- testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")),
Intervals.of("2014/2015"));
-
- testFindIntervalForKill(
- ImmutableList.of(Intervals.of("2014/2015"),
Intervals.of("2016/2017")),
- Intervals.of("2014/2017")
- );
-
- testFindIntervalForKill(
- ImmutableList.of(Intervals.of("2014/2015"),
Intervals.of("2015/2016")),
- Intervals.of("2014/2016")
- );
-
- testFindIntervalForKill(
- ImmutableList.of(Intervals.of("2015/2016"),
Intervals.of("2014/2015")),
- Intervals.of("2014/2016")
- );
-
- testFindIntervalForKill(
- ImmutableList.of(Intervals.of("2015/2017"),
Intervals.of("2014/2016")),
- Intervals.of("2014/2017")
- );
-
- testFindIntervalForKill(
- ImmutableList.of(
- Intervals.of("2015/2019"),
- Intervals.of("2014/2016"),
- Intervals.of("2018/2020")
- ),
- Intervals.of("2014/2020")
- );
-
- testFindIntervalForKill(
- ImmutableList.of(
- Intervals.of("2015/2019"),
- Intervals.of("2014/2016"),
- Intervals.of("2018/2020"),
- Intervals.of("2021/2022")
- ),
- Intervals.of("2014/2022")
- );
- }
-
- private void testFindIntervalForKill(List<Interval> segmentIntervals,
Interval expected)
- {
- SegmentsMetadataManager segmentsMetadataManager =
EasyMock.createMock(SegmentsMetadataManager.class);
- EasyMock.expect(
- segmentsMetadataManager.getUnusedSegmentIntervals(
- EasyMock.anyString(),
- EasyMock.anyObject(DateTime.class),
- EasyMock.anyInt()
- )
- ).andReturn(segmentIntervals);
- EasyMock.replay(segmentsMetadataManager);
- IndexingServiceClient indexingServiceClient =
EasyMock.createMock(IndexingServiceClient.class);
+
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt()
+ );
+
+ target.run(params);
+ Mockito.verify(indexingServiceClient, Mockito.never())
+ .killUnusedSegments(anyString(), anyString(), any(Interval.class));
+ }
- KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
- segmentsMetadataManager,
- indexingServiceClient,
- new TestDruidCoordinatorConfig.Builder()
- .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
- .withLoadTimeoutDelay(new Duration(1))
- .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
- .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
- .withCoordinatorKillMaxSegments(1000)
- .withLoadQueuePeonRepeatDelay(Duration.ZERO)
- .withCoordinatorKillIgnoreDurationToRetain(false)
- .build()
- );
+ @Test
+ public void
testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
+ {
+ Mockito.doReturn(Duration.standardDays(400))
+ .when(config).getCoordinatorKillDurationToRetain();
+ target = new KillUnusedSegments(segmentsMetadataManager,
indexingServiceClient, config);
+
+ // No unused segment is older than the retention period
+ target.run(params);
+ Mockito.verify(indexingServiceClient, Mockito.never())
+ .killUnusedSegments(anyString(), anyString(), any(Interval.class));
+ }
- Assert.assertEquals(
- expected,
- unusedSegmentsKiller.findIntervalForKill("test", 10000)
- );
- }
+ @Test
+ public void testDurationToRetain()
+ {
+ // Only segments more than a day old are killed
+ Interval expectedKillInterval = new Interval(
+ yearOldSegment.getInterval().getStart(),
+ dayOldSegment.getInterval().getEnd()
+ );
+ runAndVerifyKillInterval(expectedKillInterval);
+ }
- /**
- * Test that retainDuration is properly set based on the value available
in the
- * Coordinator config. Positive and Negative durations should work as well
as
- * null, if and only if ignoreDurationToRetain is true.
- */
- @Test
- public void testRetainDurationValues()
- {
- // Positive duration to retain
- KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
- null,
- null,
- new TestDruidCoordinatorConfig.Builder()
- .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
- .withLoadTimeoutDelay(new Duration(1))
- .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
- .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
- .withCoordinatorKillMaxSegments(1000)
- .withLoadQueuePeonRepeatDelay(Duration.ZERO)
- .withCoordinatorKillIgnoreDurationToRetain(false)
- .build()
- );
- Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(),
unusedSegmentsKiller.getRetainDuration());
+ @Test
+ public void testNegativeDurationToRetain()
+ {
+ // Duration to retain = -1 day, reinit target for config to take effect
+ Mockito.doReturn(DURATION_TO_RETAIN.negated())
+ .when(config).getCoordinatorKillDurationToRetain();
+ target = new KillUnusedSegments(segmentsMetadataManager,
indexingServiceClient, config);
+
+ // Segments upto 1 day in the future are killed
+ Interval expectedKillInterval = new Interval(
+ yearOldSegment.getInterval().getStart(),
+ nextDaySegment.getInterval().getEnd()
+ );
+ runAndVerifyKillInterval(expectedKillInterval);
+ }
- // Negative duration to retain
- unusedSegmentsKiller = new KillUnusedSegments(
- null,
- null,
- new TestDruidCoordinatorConfig.Builder()
- .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
- .withLoadTimeoutDelay(new Duration(1))
- .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
- .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
- .withCoordinatorKillMaxSegments(1000)
- .withLoadQueuePeonRepeatDelay(Duration.ZERO)
- .withCoordinatorKillIgnoreDurationToRetain(false)
- .build()
- );
- Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(),
unusedSegmentsKiller.getRetainDuration());
- }
+ @Test
+ public void testIgnoreDurationToRetain()
+ {
+ Mockito.doReturn(true)
+ .when(config).getCoordinatorKillIgnoreDurationToRetain();
+ target = new KillUnusedSegments(segmentsMetadataManager,
indexingServiceClient, config);
+
+ // All future and past unused segments are killed
+ Interval expectedKillInterval = new Interval(
+ yearOldSegment.getInterval().getStart(),
+ nextMonthSegment.getInterval().getEnd()
+ );
+ runAndVerifyKillInterval(expectedKillInterval);
+ }
- /**
- * Test that the end time upper limit is properly computated for both
positive and
- * negative durations. Also ensure that if durationToRetain is to be
ignored, that
- * the upper limit is {@link DateTime} max time.
- */
- @Test
- public void testGetEndTimeUpperLimit()
- {
- // If ignoreDurationToRetain is true, ignore the value configured for
durationToRetain and return 9999-12-31T23:59
- KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
- null,
- null,
- new TestDruidCoordinatorConfig.Builder()
- .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
- .withLoadTimeoutDelay(new Duration(1))
- .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
- .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
- .withCoordinatorKillMaxSegments(1000)
- .withLoadQueuePeonRepeatDelay(Duration.ZERO)
- .withCoordinatorKillIgnoreDurationToRetain(true)
- .build()
- );
- Assert.assertEquals(
- DateTimes.COMPARE_DATE_AS_STRING_MAX,
- unusedSegmentsKiller.getEndTimeUpperLimit()
- );
+ @Test
+ public void testMaxSegmentsToKill()
+ {
+ Mockito.doReturn(1)
+ .when(config).getCoordinatorKillMaxSegments();
+ target = new KillUnusedSegments(segmentsMetadataManager,
indexingServiceClient, config);
- // Testing a negative durationToRetain period returns proper date in
future
- unusedSegmentsKiller = new KillUnusedSegments(
- null,
- null,
- new TestDruidCoordinatorConfig.Builder()
- .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
- .withLoadTimeoutDelay(new Duration(1))
- .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
- .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
- .withCoordinatorKillMaxSegments(1000)
- .withLoadQueuePeonRepeatDelay(Duration.ZERO)
- .withCoordinatorKillIgnoreDurationToRetain(false)
- .build()
- );
+ // Only 1 unused segment is killed
+ runAndVerifyKillInterval(yearOldSegment.getInterval());
+ }
- DateTime expectedTime =
DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis());
- Assert.assertEquals(expectedTime,
unusedSegmentsKiller.getEndTimeUpperLimit());
+ private void runAndVerifyKillInterval(Interval expectedKillInterval)
+ {
+ target.run(params);
+ Mockito.verify(indexingServiceClient, Mockito.times(1)).killUnusedSegments(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("DS1"),
+ ArgumentMatchers.eq(expectedKillInterval)
+ );
+ }
- // Testing a positive durationToRetain period returns expected value in
the past
- unusedSegmentsKiller = new KillUnusedSegments(
- null,
- null,
- new TestDruidCoordinatorConfig.Builder()
- .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
- .withLoadTimeoutDelay(new Duration(1))
- .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
- .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
- .withCoordinatorKillMaxSegments(1000)
- .withLoadQueuePeonRepeatDelay(Duration.ZERO)
- .withCoordinatorKillIgnoreDurationToRetain(false)
- .build()
- );
- expectedTime =
DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis());
- Assert.assertEquals(expectedTime,
unusedSegmentsKiller.getEndTimeUpperLimit());
- }
+ private DataSegment createSegmentWithEnd(DateTime endTime)
+ {
+ return new DataSegment(
+ "DS1",
+ new Interval(Period.days(1), endTime),
+ DateTimes.nowUtc().toString(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ 1,
+ 0
+ );
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 28b84f62e9..01308d82e7 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -453,7 +453,6 @@ public class CoordinatorSimulationBuilder
.withCoordinatorStartDelay(new Duration(1L))
.withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
- .withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
.withLoadQueuePeonType("http")
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 7248903619..707ea1d0f9 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -78,6 +78,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
+ true,
1,
1,
2,
@@ -100,6 +101,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
+ true,
1,
1,
2,
@@ -122,6 +124,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
+ true,
1,
1,
2,
@@ -144,6 +147,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
+ true,
1,
1,
2,
@@ -158,7 +162,10 @@ public class CoordinatorDynamicConfigTest
Integer.MAX_VALUE
);
- actual =
CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
+ actual = CoordinatorDynamicConfig.builder()
+
.withPercentOfSegmentsToConsiderPerMove(10)
+ .withUseBatchedSegmentSampler(false)
+ .build(actual);
assertConfig(
actual,
1,
@@ -166,6 +173,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
10,
+ false,
1,
1,
2,
@@ -188,6 +196,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
10,
+ false,
1,
1,
2,
@@ -210,6 +219,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
10,
+ false,
1,
1,
2,
@@ -315,7 +325,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
- 1,
+ true, 1,
1,
2,
true,
@@ -337,6 +347,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
+ true,
1,
1,
2,
@@ -359,6 +370,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
+ true,
1,
1,
2,
@@ -407,6 +419,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
+ true,
1,
1,
2,
@@ -525,6 +538,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
+ true,
1,
1,
2,
@@ -574,6 +588,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
+ true,
1,
1,
2,
@@ -638,6 +653,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
+ true,
1,
1,
2,
@@ -665,6 +681,7 @@ public class CoordinatorDynamicConfigTest
100,
5,
100,
+ true,
15,
10,
1,
@@ -695,6 +712,7 @@ public class CoordinatorDynamicConfigTest
100,
5,
100,
+ true,
15,
10,
1,
@@ -785,6 +803,7 @@ public class CoordinatorDynamicConfigTest
int expectedMergeSegmentsLimit,
int expectedMaxSegmentsToMove,
int expectedPercentOfSegmentsToConsiderPerMove,
+ boolean expectedUseBatchedSegmentSampler,
int expectedReplicantLifetime,
int expectedReplicationThrottleLimit,
int expectedBalancerComputeThreads,
@@ -807,6 +826,7 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(expectedMergeSegmentsLimit,
config.getMergeSegmentsLimit());
Assert.assertEquals(expectedMaxSegmentsToMove,
config.getMaxSegmentsToMove());
Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove,
config.getPercentOfSegmentsToConsiderPerMove(), 0);
+ Assert.assertEquals(expectedUseBatchedSegmentSampler,
config.useBatchedSegmentSampler());
Assert.assertEquals(expectedReplicantLifetime,
config.getReplicantLifetime());
Assert.assertEquals(expectedReplicationThrottleLimit,
config.getReplicationThrottleLimit());
Assert.assertEquals(expectedBalancerComputeThreads,
config.getBalancerComputeThreads());
diff --git
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
index 1d148e6b18..eeb25db09c 100644
---
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
+++
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
@@ -197,7 +197,7 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS:
Field<CoordinatorDynamicConfig>[
{
name: 'useBatchedSegmentSampler',
type: 'boolean',
- defaultValue: false,
+ defaultValue: true,
info: (
<>
Boolean flag for whether or not we should use the Reservoir Sampling
with a reservoir of
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]