kfaraz commented on code in PR #19059:
URL: https://github.com/apache/druid/pull/19059#discussion_r2904001518


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2169,13 +2222,13 @@ private List<DataSegmentPlus> retrieveSegmentsById(
   }
 
   /**
-   * Finds the append segments that were covered by the given task REPLACE 
locks.
-   * These append segments must now be upgraded to the same version as the 
segments
+   * Finds segments were covered by the given task REPLACE locks.

Review Comment:
   ```suggestion
      * Finds segments that were covered by the given task REPLACE locks.
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -373,7 +521,7 @@ public void 
test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrec
     cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
     cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
 
-    Assertions.assertEquals(4, getTotalRowCount());

Review Comment:
   I think it was cleaner to use the `getTotalRowCount()` method.
   Just update the contents of that method to use `cluster.runSql("SELECT 
COUNT(*)")`.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -206,6 +231,129 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
     verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
   }
 
+  @MethodSource("getPartitionsSpec")
+  @ParameterizedTest(name = "partitionsSpec={0}")
+  public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) 
throws Exception
+  {
+    configureCompaction(
+        CompactionEngine.MSQ,
+        new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
+    );
+    KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = 
MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null))
+                                        
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+    // Set up first topic and supervisor
+    final String topic1 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic1, 1);
+    final KafkaSupervisorSpec supervisor1 = 
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+    cluster.callApi().postSupervisor(supervisor1);
+
+    final int totalRowCount = publish1kRecords(topic1, true) + 
publish1kRecords(topic1, false);
+    waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+    // Before compaction
+    Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+    // Create a compaction config with DAY granularity
+    InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
+            .withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
+                WikipediaStreamEventStreamGenerator.dimensions()
+                                                   .stream()
+                                                   
.map(StringDimensionSchema::new)
+                                                   
.collect(Collectors.toUnmodifiableList())))
+            .withTaskContext(Map.of("useConcurrentLocks", true))
+            .withIoConfig(new UserCompactionTaskIOConfig(true))
+            
.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build())
+            .build();
+
+    runCompactionWithSpec(dayGranularityConfig);
+    waitForAllCompactionTasksToFinish();
+
+    pauseCompaction(dayGranularityConfig);
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+    Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
+    Assertions.assertEquals("2000", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));
+
+    verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
+
+    // published another 1k
+    final int appendedRowCount = publish1kRecords(topic1, true);
+    indexer.latchableEmitter().flush();
+    waitUntilPublishedRecordsAreIngested(appendedRowCount);
+
+    // Tear down both topics and supervisors
+    kafkaServer.deleteTopic(topic1);
+    cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+
+    long totalUsed = overlord.latchableEmitter().getMetricValues(

Review Comment:
   This need not give the correct value of total used segments currently 
present in the cache.
   This metric is emitted after every cache sync, and as such the total 
aggregate would be higher than the actual number of segments.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1867,36 +1874,40 @@ private Set<DataSegmentPlus> 
createNewIdsOfAppendSegmentsAfterReplace(
       return Collections.emptySet();
     }
 
-    // For each replace interval, find the number of core partitions and total 
partitions
-    final Map<Interval, Integer> intervalToNumCorePartitions = new HashMap<>();
+    // For each replace interval, find the current partition number
     final Map<Interval, Integer> intervalToCurrentPartitionNum = new 
HashMap<>();
+    // if numChunkNotSupported by all segments in an interval, we can't update 
the corePartitions in shardSpec
+    final Set<Interval> numChunkNotSupported = new HashSet<>();
     for (DataSegment segment : replaceSegments) {
-      intervalToNumCorePartitions.put(segment.getInterval(), 
segment.getShardSpec().getNumCorePartitions());
-
       int partitionNum = segment.getShardSpec().getPartitionNum();
       intervalToCurrentPartitionNum.compute(
           segment.getInterval(),
           (i, value) -> value == null ? partitionNum : Math.max(value, 
partitionNum)
       );
+      if (!segment.isTombstone() && 
!segment.getShardSpec().canCreateNumberedPartitionChunk()) {
+        numChunkNotSupported.add(segment.getInterval());
+      }
     }
 
     // Find the segments that need to be upgraded
     final String taskId = locksHeldByReplaceTask.stream()
                                                 
.map(ReplaceTaskLock::getSupervisorTaskId)
                                                 .findFirst().orElse(null);
     final Map<String, String> upgradeSegmentToLockVersion
-        = getAppendSegmentsCommittedDuringTask(transaction, taskId);
+        = getSegmentsCoveredByTaskLock(transaction, taskId);
 
     final List<DataSegmentPlus> segmentsToUpgrade
         = retrieveSegmentsById(dataSource, transaction, 
upgradeSegmentToLockVersion.keySet());
 
     if (segmentsToUpgrade.isEmpty()) {

Review Comment:
   A short comment might help here:
   
   ```suggestion
       // If there is nothing to upgrade, return the replaceSegments unchanged
       if (segmentsToUpgrade.isEmpty()) {
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -626,12 +735,8 @@ private void verifyNoRowsWithNestedValue(String 
nestedColumn, String field, Stri
         field,
         value
     );
-    String result = cluster.callApi().onAnyBroker(b -> b.submitSqlQuery(new 
ClientSqlQuery(sql, null, false, false, false, null, null)));
-    List<Map<String, Object>> rows = JacksonUtils.readValue(
-        new DefaultObjectMapper(),
-        result.getBytes(StandardCharsets.UTF_8),
-        new TypeReference<>() {}
-    );
+    ClientSqlQuery clientSqlQuery = new ClientSqlQuery(sql, null, false, 
false, false, null, null);
+    List<Map<String, Object>> rows = parse(cluster.callApi().onAnyBroker(b -> 
b.submitSqlQuery(clientSqlQuery)));

Review Comment:
   `parse` method should not be needed. Ideally, we should be simply using 
`cluster.runSql()` for all of these cases.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -588,33 +586,37 @@ static Map<QuerySegmentSpec, DataSchema> 
createInputDataSchemas(
       Iterable<DataSegment> segmentsNotCompletelyWithinin =
           Iterables.filter(timelineSegments, s -> 
!segmentProvider.interval.contains(s.getInterval()));
       if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
-        throw new ISE(
-            "Incremental compaction doesn't allow segments not completely 
within interval[%s]",
-            segmentProvider.interval
-        );
+        throw DruidException.forPersona(DruidException.Persona.USER)
+                            .ofCategory(DruidException.Category.INVALID_INPUT)
+                            .build(
+                                "Minor compaction doesn't allow segments not 
completely within interval[%s]",
+                                segmentProvider.interval
+                            );
       }
     }
 
     if (granularitySpec == null || granularitySpec.getSegmentGranularity() == 
null) {
       Map<QuerySegmentSpec, DataSchema> inputSchemas = new HashMap<>();
-      // if segment is already compacted in incremental compaction, they need 
to be upgraded directly, supported in MSQ
-      List<DataSegment> upgradeSegments = new ArrayList<>();
+      // if segment is already compacted in minor compaction, they need to be 
upgraded directly, supported in MSQ
+      Set<DataSegment> segmentsToUpgrade = new HashSet<>();
 
       // original granularity
       final Map<Interval, List<DataSegment>> intervalToSegments = new 
TreeMap<>(
           Comparators.intervalsByStartThenEnd()
       );
 
       for (final DataSegment dataSegment : timelineSegments) {
-        if (segmentProvider.segmentsToUpgradePredicate.test(dataSegment)) {
-          upgradeSegments.add(dataSegment);
+        if (segmentProvider.shouldUpgradeSegment(dataSegment)) {
+          segmentsToUpgrade.add(dataSegment);
         } else {
           intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> 
new ArrayList<>())
                             .add(dataSegment);
         }
       }
-      if (!upgradeSegments.isEmpty()) {
-        toolbox.getTaskActionClient().submit(new 
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));
+      if (!segmentsToUpgrade.isEmpty()) {
+        log.info("Marking [%d]segments to upgrade", segmentsToUpgrade.size());

Review Comment:
   I think we should include some (if not all) of the segment IDs in the log 
too. It might be useful in debugging.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1972,35 @@ private Set<DataSegmentPlus> 
createNewIdsOfAppendSegmentsAfterReplace(
               oldSegmentMetadata.getIndexingStateFingerprint()
           )
       );
+      segmentsToInsert.add(dataSegment);
     }
 
-    return upgradedSegments;
+    return segmentsToInsert.stream().map(segment -> {
+      // update corePartitions in shard spec
+      Integer partitionNum = 
intervalToCurrentPartitionNum.get(segment.getInterval());
+      if (!segment.isTombstone() && 
!numChunkNotSupported.contains(segment.getInterval()) && partitionNum != null) {
+        return 
segment.withShardSpec(segment.getShardSpec().withCorePartitions(partitionNum + 
1));

Review Comment:
   This doesn't seem correct. The number of core partitions should be the total 
number of segments i.e. replaceSegments + upgradedSegments.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2169,13 +2222,13 @@ private List<DataSegmentPlus> retrieveSegmentsById(
   }
 
   /**
-   * Finds the append segments that were covered by the given task REPLACE 
locks.
-   * These append segments must now be upgraded to the same version as the 
segments
+   * Finds segments were covered by the given task REPLACE locks.
+   * These segments must now be upgraded to the same version as the segments
    * being committed by this replace task.
    *
    * @return Map from append Segment ID to REPLACE lock version
    */
-  private Map<String, String> getAppendSegmentsCommittedDuringTask(
+  private Map<String, String> getSegmentsCoveredByTaskLock(

Review Comment:
   ```suggestion
     private Map<String, String> getSegmentsCoveredByReplaceTaskLock(
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -206,6 +231,129 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
     verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
   }
 
+  @MethodSource("getPartitionsSpec")
+  @ParameterizedTest(name = "partitionsSpec={0}")
+  public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) 
throws Exception
+  {
+    configureCompaction(
+        CompactionEngine.MSQ,
+        new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
+    );
+    KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = 
MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null))
+                                        
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+    // Set up first topic and supervisor
+    final String topic1 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic1, 1);
+    final KafkaSupervisorSpec supervisor1 = 
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+    cluster.callApi().postSupervisor(supervisor1);
+
+    final int totalRowCount = publish1kRecords(topic1, true) + 
publish1kRecords(topic1, false);
+    waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+    // Before compaction
+    Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+    // Create a compaction config with DAY granularity
+    InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
+            .withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
+                WikipediaStreamEventStreamGenerator.dimensions()
+                                                   .stream()
+                                                   
.map(StringDimensionSchema::new)
+                                                   
.collect(Collectors.toUnmodifiableList())))
+            .withTaskContext(Map.of("useConcurrentLocks", true))
+            .withIoConfig(new UserCompactionTaskIOConfig(true))
+            
.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build())
+            .build();
+
+    runCompactionWithSpec(dayGranularityConfig);
+    waitForAllCompactionTasksToFinish();
+
+    pauseCompaction(dayGranularityConfig);
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+    Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
+    Assertions.assertEquals("2000", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));
+
+    verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
+
+    // published another 1k
+    final int appendedRowCount = publish1kRecords(topic1, true);
+    indexer.latchableEmitter().flush();
+    waitUntilPublishedRecordsAreIngested(appendedRowCount);
+
+    // Tear down both topics and supervisors
+    kafkaServer.deleteTopic(topic1);
+    cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+
+    long totalUsed = overlord.latchableEmitter().getMetricValues(
+        "segment/metadataCache/used/count",
+        Map.of(DruidMetrics.DATASOURCE, dataSource)
+    ).stream().reduce((first, second) -> second).orElse(0).longValue();
+
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+    // 1 compacted segment + 2 appended segment
+    Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
+    Assertions.assertEquals("3000", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));
+
+    runCompactionWithSpec(dayGranularityConfig);
+    waitForAllCompactionTasksToFinish();
+
+    // wait for new segments have been updated to the cache
+    overlord.latchableEmitter().waitForEvent(

Review Comment:
   If the intent is to wait for the __next__ cache sync, do the following 
instead:
   
   ```java
   overlord.latchableEmitter().waitForNextEvent(
       event -> event.hasMetricName("segment/metadataCache/sync/time")
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to