cecemei commented on code in PR #19059:
URL: https://github.com/apache/druid/pull/19059#discussion_r2880595371


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -202,6 +229,133 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
     verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
   }
 
+  @Test
+  public void test_minorCompactionWithMSQ() throws Exception
+  {
+    configureCompaction(
+        CompactionEngine.MSQ,
+        new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
+    );
+    KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = 
MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null))
+                                        
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+    // Set up first topic and supervisor
+    final String topic1 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic1, 1);
+    final KafkaSupervisorSpec supervisor1 = 
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+    cluster.callApi().postSupervisor(supervisor1);
+
+    final int totalRowCount = publish1kRecords(topic1, true) + 
publish1kRecords(topic1, false);
+    waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+    // Before compaction
+    Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+    PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 
5000, List.of("page"), false);

Review Comment:
   added parameterized test



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1852,9 +1853,14 @@ protected Set<DataSegment> insertSegments(
   }
 
   /**
-   * Creates new versions of segments appended while a "REPLACE" task was in 
progress.
+   * Retrieves segments from the upgrade segments table and creates upgraded 
versions with new intervals,
+   * versions, and partition numbers. Combines upgraded segments with replace 
segments and updates shard
+   * specs with correct core partition counts.
+   *
+   * @return pair of (upgraded segments for metadata tracking, segments to 
insert into segment table)
+   * @throws DruidException if a replace interval partially overlaps a segment 
being upgraded
    */
-  private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
+  private Pair<Set<DataSegmentPlus>, Set<DataSegment>> 
createNewSegmentsAfterReplace(

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to