kfaraz commented on code in PR #19059:
URL: https://github.com/apache/druid/pull/19059#discussion_r2904001518
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2169,13 +2222,13 @@ private List<DataSegmentPlus> retrieveSegmentsById(
}
/**
- * Finds the append segments that were covered by the given task REPLACE
locks.
- * These append segments must now be upgraded to the same version as the
segments
+ * Finds segments were covered by the given task REPLACE locks.
Review Comment:
```suggestion
* Finds segments that were covered by the given task REPLACE locks.
```
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -373,7 +521,7 @@ public void
test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrec
cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
- Assertions.assertEquals(4, getTotalRowCount());
Review Comment:
I think it was cleaner to use the `getTotalRowCount()` method.
Just update the contents of that method to use `cluster.runSql("SELECT
COUNT(*)")`.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -206,6 +231,129 @@ public void
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
}
+ @MethodSource("getPartitionsSpec")
+ @ParameterizedTest(name = "partitionsSpec={0}")
+ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec)
throws Exception
+ {
+ configureCompaction(
+ CompactionEngine.MSQ,
+ new MostFragmentedIntervalFirstPolicy(2, new
HumanReadableBytes("1KiB"), null, 80, null)
+ );
+ KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder =
MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null))
+
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
+ .withIoConfig(ioConfig ->
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+ // Set up first topic and supervisor
+ final String topic1 = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic1, 1);
+ final KafkaSupervisorSpec supervisor1 =
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+ cluster.callApi().postSupervisor(supervisor1);
+
+ final int totalRowCount = publish1kRecords(topic1, true) +
publish1kRecords(topic1, false);
+ waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+ // Before compaction
+ Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+ // Create a compaction config with DAY granularity
+ InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
+ InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(dataSource)
+ .withSkipOffsetFromLatest(Period.seconds(0))
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
+ .withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
+ WikipediaStreamEventStreamGenerator.dimensions()
+ .stream()
+
.map(StringDimensionSchema::new)
+
.collect(Collectors.toUnmodifiableList())))
+ .withTaskContext(Map.of("useConcurrentLocks", true))
+ .withIoConfig(new UserCompactionTaskIOConfig(true))
+
.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build())
+ .build();
+
+ runCompactionWithSpec(dayGranularityConfig);
+ waitForAllCompactionTasksToFinish();
+
+ pauseCompaction(dayGranularityConfig);
+ Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+ Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
+ Assertions.assertEquals("2000", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+
+ verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
+
+ // published another 1k
+ final int appendedRowCount = publish1kRecords(topic1, true);
+ indexer.latchableEmitter().flush();
+ waitUntilPublishedRecordsAreIngested(appendedRowCount);
+
+ // Tear down both topics and supervisors
+ kafkaServer.deleteTopic(topic1);
+ cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+
+ long totalUsed = overlord.latchableEmitter().getMetricValues(
Review Comment:
This need not give the correct value of total used segments currently
present in the cache.
This metric is emitted after every cache sync, and as such the total
aggregate would be higher than the actual number of segments.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1867,36 +1874,40 @@ private Set<DataSegmentPlus>
createNewIdsOfAppendSegmentsAfterReplace(
return Collections.emptySet();
}
- // For each replace interval, find the number of core partitions and total
partitions
- final Map<Interval, Integer> intervalToNumCorePartitions = new HashMap<>();
+ // For each replace interval, find the current partition number
final Map<Interval, Integer> intervalToCurrentPartitionNum = new
HashMap<>();
+ // if numChunkNotSupported by all segments in an interval, we can't update
the corePartitions in shardSpec
+ final Set<Interval> numChunkNotSupported = new HashSet<>();
for (DataSegment segment : replaceSegments) {
- intervalToNumCorePartitions.put(segment.getInterval(),
segment.getShardSpec().getNumCorePartitions());
-
int partitionNum = segment.getShardSpec().getPartitionNum();
intervalToCurrentPartitionNum.compute(
segment.getInterval(),
(i, value) -> value == null ? partitionNum : Math.max(value,
partitionNum)
);
+ if (!segment.isTombstone() &&
!segment.getShardSpec().canCreateNumberedPartitionChunk()) {
+ numChunkNotSupported.add(segment.getInterval());
+ }
}
// Find the segments that need to be upgraded
final String taskId = locksHeldByReplaceTask.stream()
.map(ReplaceTaskLock::getSupervisorTaskId)
.findFirst().orElse(null);
final Map<String, String> upgradeSegmentToLockVersion
- = getAppendSegmentsCommittedDuringTask(transaction, taskId);
+ = getSegmentsCoveredByTaskLock(transaction, taskId);
final List<DataSegmentPlus> segmentsToUpgrade
= retrieveSegmentsById(dataSource, transaction,
upgradeSegmentToLockVersion.keySet());
if (segmentsToUpgrade.isEmpty()) {
Review Comment:
A short comment might help here:
```suggestion
// If there is nothing to upgrade, return the replaceSegments unchanged
if (segmentsToUpgrade.isEmpty()) {
```
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -626,12 +735,8 @@ private void verifyNoRowsWithNestedValue(String
nestedColumn, String field, Stri
field,
value
);
- String result = cluster.callApi().onAnyBroker(b -> b.submitSqlQuery(new
ClientSqlQuery(sql, null, false, false, false, null, null)));
- List<Map<String, Object>> rows = JacksonUtils.readValue(
- new DefaultObjectMapper(),
- result.getBytes(StandardCharsets.UTF_8),
- new TypeReference<>() {}
- );
+ ClientSqlQuery clientSqlQuery = new ClientSqlQuery(sql, null, false,
false, false, null, null);
+ List<Map<String, Object>> rows = parse(cluster.callApi().onAnyBroker(b ->
b.submitSqlQuery(clientSqlQuery)));
Review Comment:
`parse` method should not be needed. Ideally, we should be simply using
`cluster.runSql()` for all of these cases.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -588,33 +586,37 @@ static Map<QuerySegmentSpec, DataSchema>
createInputDataSchemas(
Iterable<DataSegment> segmentsNotCompletelyWithinin =
Iterables.filter(timelineSegments, s ->
!segmentProvider.interval.contains(s.getInterval()));
if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
- throw new ISE(
- "Incremental compaction doesn't allow segments not completely
within interval[%s]",
- segmentProvider.interval
- );
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "Minor compaction doesn't allow segments not
completely within interval[%s]",
+ segmentProvider.interval
+ );
}
}
if (granularitySpec == null || granularitySpec.getSegmentGranularity() ==
null) {
Map<QuerySegmentSpec, DataSchema> inputSchemas = new HashMap<>();
- // if segment is already compacted in incremental compaction, they need
to be upgraded directly, supported in MSQ
- List<DataSegment> upgradeSegments = new ArrayList<>();
+ // if segment is already compacted in minor compaction, they need to be
upgraded directly, supported in MSQ
+ Set<DataSegment> segmentsToUpgrade = new HashSet<>();
// original granularity
final Map<Interval, List<DataSegment>> intervalToSegments = new
TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
for (final DataSegment dataSegment : timelineSegments) {
- if (segmentProvider.segmentsToUpgradePredicate.test(dataSegment)) {
- upgradeSegments.add(dataSegment);
+ if (segmentProvider.shouldUpgradeSegment(dataSegment)) {
+ segmentsToUpgrade.add(dataSegment);
} else {
intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k ->
new ArrayList<>())
.add(dataSegment);
}
}
- if (!upgradeSegments.isEmpty()) {
- toolbox.getTaskActionClient().submit(new
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));
+ if (!segmentsToUpgrade.isEmpty()) {
+ log.info("Marking [%d]segments to upgrade", segmentsToUpgrade.size());
Review Comment:
I think we should include some (if not all) of the segment IDs in the log
too. It might be useful in debugging.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1972,35 @@ private Set<DataSegmentPlus>
createNewIdsOfAppendSegmentsAfterReplace(
oldSegmentMetadata.getIndexingStateFingerprint()
)
);
+ segmentsToInsert.add(dataSegment);
}
- return upgradedSegments;
+ return segmentsToInsert.stream().map(segment -> {
+ // update corePartitions in shard spec
+ Integer partitionNum =
intervalToCurrentPartitionNum.get(segment.getInterval());
+ if (!segment.isTombstone() &&
!numChunkNotSupported.contains(segment.getInterval()) && partitionNum != null) {
+ return
segment.withShardSpec(segment.getShardSpec().withCorePartitions(partitionNum +
1));
Review Comment:
This doesn't seem correct. The number of core partitions should be the total
number of segments i.e. replaceSegments + upgradedSegments.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2169,13 +2222,13 @@ private List<DataSegmentPlus> retrieveSegmentsById(
}
/**
- * Finds the append segments that were covered by the given task REPLACE
locks.
- * These append segments must now be upgraded to the same version as the
segments
+ * Finds segments were covered by the given task REPLACE locks.
+ * These segments must now be upgraded to the same version as the segments
* being committed by this replace task.
*
* @return Map from append Segment ID to REPLACE lock version
*/
- private Map<String, String> getAppendSegmentsCommittedDuringTask(
+ private Map<String, String> getSegmentsCoveredByTaskLock(
Review Comment:
```suggestion
private Map<String, String> getSegmentsCoveredByReplaceTaskLock(
```
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -206,6 +231,129 @@ public void
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
}
+ @MethodSource("getPartitionsSpec")
+ @ParameterizedTest(name = "partitionsSpec={0}")
+ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec)
throws Exception
+ {
+ configureCompaction(
+ CompactionEngine.MSQ,
+ new MostFragmentedIntervalFirstPolicy(2, new
HumanReadableBytes("1KiB"), null, 80, null)
+ );
+ KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder =
MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null))
+
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
+ .withIoConfig(ioConfig ->
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+ // Set up first topic and supervisor
+ final String topic1 = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic1, 1);
+ final KafkaSupervisorSpec supervisor1 =
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+ cluster.callApi().postSupervisor(supervisor1);
+
+ final int totalRowCount = publish1kRecords(topic1, true) +
publish1kRecords(topic1, false);
+ waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+ // Before compaction
+ Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+ // Create a compaction config with DAY granularity
+ InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
+ InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(dataSource)
+ .withSkipOffsetFromLatest(Period.seconds(0))
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
+ .withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
+ WikipediaStreamEventStreamGenerator.dimensions()
+ .stream()
+
.map(StringDimensionSchema::new)
+
.collect(Collectors.toUnmodifiableList())))
+ .withTaskContext(Map.of("useConcurrentLocks", true))
+ .withIoConfig(new UserCompactionTaskIOConfig(true))
+
.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build())
+ .build();
+
+ runCompactionWithSpec(dayGranularityConfig);
+ waitForAllCompactionTasksToFinish();
+
+ pauseCompaction(dayGranularityConfig);
+ Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+ Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
+ Assertions.assertEquals("2000", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+
+ verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
+
+ // published another 1k
+ final int appendedRowCount = publish1kRecords(topic1, true);
+ indexer.latchableEmitter().flush();
+ waitUntilPublishedRecordsAreIngested(appendedRowCount);
+
+ // Tear down both topics and supervisors
+ kafkaServer.deleteTopic(topic1);
+ cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+
+ long totalUsed = overlord.latchableEmitter().getMetricValues(
+ "segment/metadataCache/used/count",
+ Map.of(DruidMetrics.DATASOURCE, dataSource)
+ ).stream().reduce((first, second) -> second).orElse(0).longValue();
+
+ Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+ // 1 compacted segment + 2 appended segment
+ Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
+ Assertions.assertEquals("3000", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+
+ runCompactionWithSpec(dayGranularityConfig);
+ waitForAllCompactionTasksToFinish();
+
+ // wait for new segments have been updated to the cache
+ overlord.latchableEmitter().waitForEvent(
Review Comment:
If the intent is to wait for the __next__ cache sync, do the following
instead:
```java
overlord.latchableEmitter().waitForNextEvent(
event -> event.hasMetricName("segment/metadataCache/sync/time")
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]