This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 786e772d26 Remove config
`druid.coordinator.compaction.skipLockedIntervals` (#14807)
786e772d26 is described below
commit 786e772d26c4358d02fff8c2c1452ecde471d77a
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 14 12:31:15 2023 +0530
Remove config `druid.coordinator.compaction.skipLockedIntervals` (#14807)
The value of `druid.coordinator.compaction.skipLockedIntervals` should
always be `true`.
---
.../druid/server/coordinator/DruidCoordinator.java | 2 +-
.../server/coordinator/DruidCoordinatorConfig.java | 6 --
.../server/coordinator/duty/CompactSegments.java | 17 ----
.../coordinator/DruidCoordinatorConfigTest.java | 3 -
.../server/coordinator/DruidCoordinatorTest.java | 17 +---
.../coordinator/TestDruidCoordinatorConfig.java | 18 ----
.../coordinator/duty/CompactSegmentsTest.java | 106 +++++----------------
7 files changed, 32 insertions(+), 137 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 272de19010..09d8e2b687 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -612,7 +612,7 @@ public class DruidCoordinator
{
List<CompactSegments> compactSegmentsDutyFromCustomGroups =
getCompactSegmentsDutyFromCustomGroups();
if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
- return new CompactSegments(config, compactionSegmentSearchPolicy,
overlordClient);
+ return new CompactSegments(compactionSegmentSearchPolicy,
overlordClient);
} else {
if (compactSegmentsDutyFromCustomGroups.size() > 1) {
log.warn(
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index 9495f30f2c..c28da1f439 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -131,10 +131,4 @@ public abstract class DruidCoordinatorConfig
return 1;
}
- @Config("druid.coordinator.compaction.skipLockedIntervals")
- public boolean getCompactionSkipLockedIntervals()
- {
- return true;
- }
-
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 71495cac12..6ba1e3919a 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -48,7 +48,6 @@ import
org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
@@ -85,7 +84,6 @@ public class CompactSegments implements CoordinatorCustomDuty
status -> null != status &&
COMPACTION_TASK_TYPE.equals(status.getType());
private final CompactionSegmentSearchPolicy policy;
- private final boolean skipLockedIntervals;
private final OverlordClient overlordClient;
// This variable is updated by the Coordinator thread executing duties and
@@ -95,23 +93,13 @@ public class CompactSegments implements
CoordinatorCustomDuty
@Inject
@JsonCreator
public CompactSegments(
- @JacksonInject DruidCoordinatorConfig config,
@JacksonInject CompactionSegmentSearchPolicy policy,
@JacksonInject OverlordClient overlordClient
)
{
this.policy = policy;
this.overlordClient = overlordClient;
- this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
resetCompactionSnapshot();
-
- LOG.info("Scheduling compaction with skipLockedIntervals [%s]",
skipLockedIntervals);
- }
-
- @VisibleForTesting
- public boolean isSkipLockedIntervals()
- {
- return skipLockedIntervals;
}
@VisibleForTesting
@@ -272,11 +260,6 @@ public class CompactSegments implements
CoordinatorCustomDuty
List<DataSourceCompactionConfig> compactionConfigs
)
{
- if (!skipLockedIntervals) {
- LOG.info("Not skipping any locked interval for Compaction");
- return new HashMap<>();
- }
-
final Map<String, Integer> minTaskPriority = compactionConfigs
.stream()
.collect(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index db264e55e0..53c2808aa7 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -46,7 +46,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(7776000000L,
config.getCoordinatorKillDurationToRetain().getMillis());
Assert.assertEquals(100, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000),
config.getLoadTimeoutDelay());
- Assert.assertTrue(config.getCompactionSkipLockedIntervals());
Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
Assert.assertEquals("http", config.getLoadQueuePeonType());
@@ -62,7 +61,6 @@ public class DruidCoordinatorConfigTest
props.setProperty("druid.coordinator.kill.pendingSegments.on", "true");
props.setProperty("druid.coordinator.load.timeout", "PT1s");
props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay",
"PT0.100s");
- props.setProperty("druid.coordinator.compaction.skipLockedIntervals",
"false");
props.setProperty("druid.coordinator.kill.ignoreDurationToRetain", "true");
factory = Config.createFactory(props);
@@ -75,7 +73,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(new Duration("PT1s"),
config.getCoordinatorKillDurationToRetain());
Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
- Assert.assertFalse(config.getCompactionSkipLockedIntervals());
Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());
// Test negative druid.coordinator.kill.durationToRetain now that it is
valid.
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 979181619a..cf53708af0 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -733,15 +733,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void
testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
{
- DruidCoordinatorConfig differentConfigUsedInCustomGroup = new
TestDruidCoordinatorConfig.Builder()
- .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
- .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
- .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
- .withCoordinatorKillMaxSegments(10)
- .withCompactionSkippedLockedIntervals(false)
- .withCoordinatorKillIgnoreDurationToRetain(false)
- .build();
- CoordinatorCustomDutyGroup compactSegmentCustomGroup = new
CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1),
ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null,
null)));
+ CoordinatorCustomDutyGroup compactSegmentCustomGroup = new
CoordinatorCustomDutyGroup(
+ "group1",
+ Duration.standardSeconds(1),
+ ImmutableList.of(new CompactSegments(null, null))
+ );
CoordinatorCustomDutyGroups customDutyGroups = new
CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
@@ -777,9 +773,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
// CompactSegments returned by this method should be from the Custom Duty
Group
CompactSegments duty =
coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
-
Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(),
duty.isSkipLockedIntervals());
- // We should get the CompactSegment from the custom duty group which was
created with a different config than the config in DruidCoordinator
-
Assert.assertEquals(differentConfigUsedInCustomGroup.getCompactionSkipLockedIntervals(),
duty.isSkipLockedIntervals());
}
@Test(timeout = 3000)
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index 271bda1131..d6089557c3 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -40,7 +40,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
private final Duration coordinatorDatasourceKillPeriod;
private final Duration coordinatorDatasourceKillDurationToRetain;
private final int coordinatorKillMaxSegments;
- private final boolean compactionSkipLockedIntervals;
private final boolean coordinatorKillIgnoreDurationToRetain;
private final String loadQueuePeonType;
private final Duration httpLoadQueuePeonRepeatDelay;
@@ -66,7 +65,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
Duration coordinatorDatasourceKillPeriod,
Duration coordinatorDatasourceKillDurationToRetain,
int coordinatorKillMaxSegments,
- boolean compactionSkipLockedIntervals,
boolean coordinatorKillIgnoreDurationToRetain,
String loadQueuePeonType,
Duration httpLoadQueuePeonRepeatDelay,
@@ -92,7 +90,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
this.coordinatorDatasourceKillDurationToRetain =
coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
- this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
this.coordinatorKillIgnoreDurationToRetain =
coordinatorKillIgnoreDurationToRetain;
this.loadQueuePeonType = loadQueuePeonType;
this.httpLoadQueuePeonRepeatDelay = httpLoadQueuePeonRepeatDelay;
@@ -203,12 +200,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
return loadTimeoutDelay == null ? super.getLoadTimeoutDelay() :
loadTimeoutDelay;
}
- @Override
- public boolean getCompactionSkipLockedIntervals()
- {
- return compactionSkipLockedIntervals;
- }
-
@Override
public boolean getCoordinatorKillIgnoreDurationToRetain()
{
@@ -268,7 +259,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY =
Duration.millis(60000);
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_HOST_TIMEOUT =
Duration.millis(300000);
private static final int DEFAULT_HTTP_LOAD_QUEUE_PEON_BATCH_SIZE = 1;
- private static final boolean DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS =
true;
private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = new
Duration("PT86400s");
private static final Duration
DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
@@ -294,7 +284,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
private Integer curatorLoadQueuePeonNumCallbackThreads;
private Duration httpLoadQueuePeonHostTimeout;
private Integer httpLoadQueuePeonBatchSize;
- private Boolean compactionSkippedLockedIntervals;
private Duration coordinatorAuditKillPeriod;
private Duration coordinatorAuditKillDurationToRetain;
@@ -428,12 +417,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
return this;
}
- public Builder withCompactionSkippedLockedIntervals(boolean
compactionSkippedLockedIntervals)
- {
- this.compactionSkippedLockedIntervals = compactionSkippedLockedIntervals;
- return this;
- }
-
public Builder withCoordianatorAuditKillPeriod(Duration
coordinatorAuditKillPeriod)
{
this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
@@ -466,7 +449,6 @@ public class TestDruidCoordinatorConfig extends
DruidCoordinatorConfig
coordinatorDatasourceKillPeriod == null ?
DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD : coordinatorDatasourceKillPeriod,
coordinatorDatasourceKillDurationToRetain == null ?
DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN :
coordinatorDatasourceKillDurationToRetain,
coordinatorKillMaxSegments == null ?
DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS : coordinatorKillMaxSegments,
- compactionSkippedLockedIntervals == null ?
DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS : compactionSkippedLockedIntervals,
coordinatorKillIgnoreDurationToRetain == null ?
DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN :
coordinatorKillIgnoreDurationToRetain,
loadQueuePeonType == null ? DEFAULT_LOAD_QUEUE_PEON_TYPE :
loadQueuePeonType,
httpLoadQueuePeonRepeatDelay == null ?
DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY : httpLoadQueuePeonRepeatDelay,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index e9a201c90d..1843adf9c2 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -192,7 +192,6 @@ public class CompactSegmentsTest
}
}
dataSources = DataSourcesSnapshot.fromUsedSegments(allSegments,
ImmutableMap.of());
-
Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(true);
}
private DataSegment createSegment(String dataSource, int startDay, boolean
beforeNoon, int partition)
@@ -238,12 +237,11 @@ public class CompactSegmentsTest
.addValue(CompactionSegmentSearchPolicy.class, SEARCH_POLICY)
);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
String compactSegmentString =
JSON_MAPPER.writeValueAsString(compactSegments);
CompactSegments serdeCompactSegments =
JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
Assert.assertNotNull(serdeCompactSegments);
- Assert.assertEquals(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals(),
serdeCompactSegments.isSkipLockedIntervals());
Assert.assertSame(overlordClient,
serdeCompactSegments.getOverlordClient());
}
@@ -251,7 +249,7 @@ public class CompactSegmentsTest
public void testRun()
{
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
@@ -327,7 +325,7 @@ public class CompactSegmentsTest
public void testMakeStats()
{
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots =
compactSegments.getAutoCompactionSnapshot();
@@ -421,7 +419,7 @@ public class CompactSegmentsTest
dataSources = DataSourcesSnapshot.fromUsedSegments(segments,
ImmutableMap.of());
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots =
compactSegments.getAutoCompactionSnapshot();
@@ -483,7 +481,7 @@ public class CompactSegmentsTest
public void testMakeStatsWithDeactivatedDatasource()
{
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots =
compactSegments.getAutoCompactionSnapshot();
@@ -575,7 +573,7 @@ public class CompactSegmentsTest
dataSources = DataSourcesSnapshot.fromUsedSegments(segments,
ImmutableMap.of());
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots =
compactSegments.getAutoCompactionSnapshot();
@@ -634,7 +632,7 @@ public class CompactSegmentsTest
public void testRunMultipleCompactionTaskSlots()
{
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
final CoordinatorRunStats stats = doCompactSegments(compactSegments, 3);
Assert.assertEquals(3, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@@ -648,7 +646,7 @@ public class CompactSegmentsTest
int maxCompactionSlot = 3;
Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(),
maxCompactionSlot, true);
Assert.assertEquals(maxCompactionSlot,
stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@@ -662,7 +660,7 @@ public class CompactSegmentsTest
int maxCompactionSlot = 100;
Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(),
maxCompactionSlot, true);
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE,
stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@@ -675,7 +673,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -732,7 +730,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -781,7 +779,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -830,7 +828,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -890,7 +888,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -942,7 +940,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -991,7 +989,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1101,7 +1099,7 @@ public class CompactSegmentsTest
Mockito.when(mockClient.taskPayload(ArgumentMatchers.eq(conflictTaskId)))
.thenReturn(Futures.immediateFuture(runningConflictCompactionTaskPayload));
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@@ -1163,7 +1161,7 @@ public class CompactSegmentsTest
public void testRunParallelCompactionMultipleCompactionTaskSlots()
{
final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
final CoordinatorRunStats stats = doCompactSegments(compactSegments,
createCompactionConfigs(2), 4);
Assert.assertEquals(4, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@@ -1195,7 +1193,7 @@ public class CompactSegmentsTest
// Verify that locked intervals are skipped and only one compaction task
// is submitted for dataSource_0
- CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG,
SEARCH_POLICY, overlordClient);
+ CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(1, stats.get(Stats.Compaction.SUBMITTED_TASKS));
@@ -1215,7 +1213,7 @@ public class CompactSegmentsTest
NullHandling.initializeForTests();
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1265,7 +1263,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1317,7 +1315,7 @@ public class CompactSegmentsTest
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new
CountAggregatorFactory("cnt")};
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1363,58 +1361,6 @@ public class CompactSegmentsTest
Assert.assertArrayEquals(aggregatorFactories, actual);
}
- @Test
- public void testRunWithLockedIntervalsNoSkip()
- {
-
Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(false);
-
- final TestOverlordClient overlordClient = new
TestOverlordClient(JSON_MAPPER);
-
- // Lock all intervals for all the dataSources
- final String datasource0 = DATA_SOURCE_PREFIX + 0;
- overlordClient.lockedIntervals
- .computeIfAbsent(datasource0, k -> new ArrayList<>())
- .add(Intervals.of("2017/2018"));
-
- final String datasource1 = DATA_SOURCE_PREFIX + 1;
- overlordClient.lockedIntervals
- .computeIfAbsent(datasource1, k -> new ArrayList<>())
- .add(Intervals.of("2017/2018"));
-
- final String datasource2 = DATA_SOURCE_PREFIX + 2;
- overlordClient.lockedIntervals
- .computeIfAbsent(datasource2, k -> new ArrayList<>())
- .add(Intervals.of("2017/2018"));
-
- // Verify that no locked intervals are skipped
- CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG,
SEARCH_POLICY, overlordClient);
- int maxTaskSlots = partitionsSpec instanceof SingleDimensionPartitionsSpec
? 5 : 3;
- final CoordinatorRunStats stats = doCompactSegments(compactSegments,
createCompactionConfigs(1), maxTaskSlots);
- Assert.assertEquals(3, stats.get(Stats.Compaction.SUBMITTED_TASKS));
- Assert.assertEquals(3, overlordClient.submittedCompactionTasks.size());
- overlordClient.submittedCompactionTasks.forEach(task -> {
- System.out.println(task.getDataSource() + " : " +
task.getIoConfig().getInputSpec().getInterval());
- });
-
- // Verify that tasks are submitted for the latest interval of each
dataSource
- final Map<String, Interval> datasourceToInterval = new HashMap<>();
- overlordClient.submittedCompactionTasks.forEach(
- task -> datasourceToInterval.put(
- task.getDataSource(),
task.getIoConfig().getInputSpec().getInterval()));
- Assert.assertEquals(
- Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
- datasourceToInterval.get(datasource0)
- );
- Assert.assertEquals(
- Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
- datasourceToInterval.get(datasource1)
- );
- Assert.assertEquals(
- Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
- datasourceToInterval.get(datasource2)
- );
- }
-
@Test
public void testDetermineSegmentGranularityFromSegmentsToCompact()
{
@@ -1450,7 +1396,7 @@ public class CompactSegmentsTest
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@@ -1536,7 +1482,7 @@ public class CompactSegmentsTest
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@@ -1592,7 +1538,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1644,7 +1590,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
- final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
+ final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY,
mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]