cecemei commented on code in PR #19059:
URL: https://github.com/apache/druid/pull/19059#discussion_r2880595371
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -202,6 +229,133 @@ public void
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
}
+ @Test
+ public void test_minorCompactionWithMSQ() throws Exception
+ {
+ configureCompaction(
+ CompactionEngine.MSQ,
+ new MostFragmentedIntervalFirstPolicy(2, new
HumanReadableBytes("1KiB"), null, 80, null)
+ );
+ KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder =
MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null))
+
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
+ .withIoConfig(ioConfig ->
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+ // Set up first topic and supervisor
+ final String topic1 = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic1, 1);
+ final KafkaSupervisorSpec supervisor1 =
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+ cluster.callApi().postSupervisor(supervisor1);
+
+ final int totalRowCount = publish1kRecords(topic1, true) +
publish1kRecords(topic1, false);
+ waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+ // Before compaction
+ Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+ PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null,
5000, List.of("page"), false);
Review Comment:
added parameterized test
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1852,9 +1853,14 @@ protected Set<DataSegment> insertSegments(
}
/**
- * Creates new versions of segments appended while a "REPLACE" task was in
progress.
+ * Retrieves segments from the upgrade segments table and creates upgraded
versions with new intervals,
+ * versions, and partition numbers. Combines upgraded segments with replace
segments and updates shard
+ * specs with correct core partition counts.
+ *
+ * @return pair of (upgraded segments for metadata tracking, segments to
insert into segment table)
+ * @throws DruidException if a replace interval partially overlaps a segment
being upgraded
*/
- private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
+ private Pair<Set<DataSegmentPlus>, Set<DataSegment>>
createNewSegmentsAfterReplace(
Review Comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]