kfaraz commented on code in PR #15965:
URL: https://github.com/apache/druid/pull/15965#discussion_r1546076578


##########
processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java:
##########
@@ -361,10 +364,48 @@ public void testWithLastCompactionState()
                                            
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
                                            .shardSpec(getShardSpec(7))
                                            .size(0)
-                                           .build();
+                                            .build();

Review Comment:
   Nit: unneeded change.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1780,7 +1891,8 @@ private static QueryDefinition makeQueryDefinition(
       }
     } else {
       shuffleSpecFactory = querySpec.getDestination()
-                                    
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));
+                                    
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery()
+                                                                               
                           .context()));

Review Comment:
   Nit: style
   ```suggestion
                                       .getShuffleSpecFactory(
                                           
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
                                       );
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
       //noinspection unchecked
       @SuppressWarnings("unchecked")
       final Set<DataSegment> segments = (Set<DataSegment>) 
queryKernel.getResultObjectForStage(finalStageId);
+
+      Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction = Function.identity();
+
+      boolean storeCompactionState = 
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+                                                 .getBoolean(
+                                                     
Tasks.STORE_COMPACTION_STATE_KEY,
+                                                     
Tasks.DEFAULT_STORE_COMPACTION_STATE
+                                                 );
+
+      if (!segments.isEmpty() && storeCompactionState) {
+
+        DataSourceMSQDestination destination = (DataSourceMSQDestination) 
task.getQuerySpec().getDestination();
+        if (!destination.isReplaceTimeChunks()) {
+          // Only do this for replace queries, whether originating directly or 
via compaction
+          log.error("storeCompactionState flag set for a non-REPLACE query 
[%s]", queryDef.getQueryId());
+        } else {
+
+          DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) 
queryKernel
+              
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+          ShardSpec shardSpec = 
segments.stream().findFirst().get().getShardSpec();
+
+          compactionStateAnnotateFunction = 
prepareCompactionStateAnnotateFunction(
+              task(),
+              context.jsonMapper(),
+              dataSchema,
+              shardSpec,
+              queryDef.getQueryId()
+          );
+        }
+      }
+
       log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), 
segments.size());
-      publishAllSegments(segments);
+      publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+    }
+  }
+
+  public static Function<Set<DataSegment>, Set<DataSegment>> 
prepareCompactionStateAnnotateFunction(

Review Comment:
   Why is this public? Is it used in a test anywhere?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
       //noinspection unchecked
       @SuppressWarnings("unchecked")
       final Set<DataSegment> segments = (Set<DataSegment>) 
queryKernel.getResultObjectForStage(finalStageId);
+
+      Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction = Function.identity();
+
+      boolean storeCompactionState = 
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+                                                 .getBoolean(
+                                                     
Tasks.STORE_COMPACTION_STATE_KEY,
+                                                     
Tasks.DEFAULT_STORE_COMPACTION_STATE
+                                                 );
+
+      if (!segments.isEmpty() && storeCompactionState) {
+
+        DataSourceMSQDestination destination = (DataSourceMSQDestination) 
task.getQuerySpec().getDestination();
+        if (!destination.isReplaceTimeChunks()) {
+          // Only do this for replace queries, whether originating directly or 
via compaction
+          log.error("storeCompactionState flag set for a non-REPLACE query 
[%s]", queryDef.getQueryId());
+        } else {
+
+          DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) 
queryKernel
+              
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+          ShardSpec shardSpec = 
segments.stream().findFirst().get().getShardSpec();
+
+          compactionStateAnnotateFunction = 
prepareCompactionStateAnnotateFunction(
+              task(),
+              context.jsonMapper(),
+              dataSchema,
+              shardSpec,
+              queryDef.getQueryId()
+          );
+        }
+      }
+
       log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), 
segments.size());
-      publishAllSegments(segments);
+      publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+    }
+  }
+
+  public static Function<Set<DataSegment>, Set<DataSegment>> 
prepareCompactionStateAnnotateFunction(
+      MSQControllerTask task,
+      ObjectMapper jsonMapper,
+      DataSchema dataSchema,
+      ShardSpec shardSpec,
+      String queryId
+  )
+  {
+    PartitionsSpec partitionSpec;
+
+    if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+         || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+      List<String> partitionDimensions = ((DimensionRangeShardSpec) 
shardSpec).getDimensions();
+      partitionSpec = new DimensionRangePartitionsSpec(
+          task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
+          null,
+          partitionDimensions,
+          false
+      );
+
+    } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
+      partitionSpec = new 
DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
 null);
+    } else {
+      log.error(
+          "Query [%s] skipping storing compaction state in segments as shard 
spec of unsupported type [%s].",
+          queryId, shardSpec.getType()
+      );
+      return Function.identity();
     }
+
+    Granularity segmentGranularity = ((DataSourceMSQDestination) 
task.getQuerySpec()
+                                                                     
.getDestination()).getSegmentGranularity();
+
+    GranularitySpec granularitySpec = new UniformGranularitySpec(
+        segmentGranularity,
+        dataSchema.getGranularitySpec().getQueryGranularity(),
+        dataSchema.getGranularitySpec().isRollup(),
+        dataSchema.getGranularitySpec().inputIntervals()
+    );
+
+    DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
+    Map<String, Object> transformSpec = 
TransformSpec.NONE.equals(dataSchema.getTransformSpec())
+                                        ? null
+                                        : new 
ClientCompactionTaskTransformSpec(dataSchema.getTransformSpec()
+                                                                               
           .getFilter()).asMap(
+                                            jsonMapper);
+    List<Object> metricsSpec = dataSchema.getAggregators() == null
+                               ? null
+                               : jsonMapper.convertValue(
+                                   dataSchema.getAggregators(), new 
TypeReference<List<Object>>()
+                                   {
+                                   });
+
+
+    IndexSpec indexSpec = task.getQuerySpec().getTuningConfig().getIndexSpec();
+
+    log.info("Query [%s] storing compaction state in segments.", queryId);

Review Comment:
   ```suggestion
       log.info("Query[%s] storing compaction state in segments.", queryId);
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
       //noinspection unchecked
       @SuppressWarnings("unchecked")
       final Set<DataSegment> segments = (Set<DataSegment>) 
queryKernel.getResultObjectForStage(finalStageId);
+
+      Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction = Function.identity();
+
+      boolean storeCompactionState = 
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+                                                 .getBoolean(
+                                                     
Tasks.STORE_COMPACTION_STATE_KEY,
+                                                     
Tasks.DEFAULT_STORE_COMPACTION_STATE
+                                                 );
+
+      if (!segments.isEmpty() && storeCompactionState) {
+
+        DataSourceMSQDestination destination = (DataSourceMSQDestination) 
task.getQuerySpec().getDestination();
+        if (!destination.isReplaceTimeChunks()) {
+          // Only do this for replace queries, whether originating directly or 
via compaction
+          log.error("storeCompactionState flag set for a non-REPLACE query 
[%s]", queryDef.getQueryId());
+        } else {
+
+          DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) 
queryKernel
+              
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+          ShardSpec shardSpec = 
segments.stream().findFirst().get().getShardSpec();
+
+          compactionStateAnnotateFunction = 
prepareCompactionStateAnnotateFunction(
+              task(),
+              context.jsonMapper(),
+              dataSchema,
+              shardSpec,
+              queryDef.getQueryId()
+          );
+        }
+      }
+
       log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), 
segments.size());
-      publishAllSegments(segments);
+      publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+    }
+  }
+
+  public static Function<Set<DataSegment>, Set<DataSegment>> 
prepareCompactionStateAnnotateFunction(
+      MSQControllerTask task,
+      ObjectMapper jsonMapper,
+      DataSchema dataSchema,
+      ShardSpec shardSpec,
+      String queryId
+  )
+  {
+    PartitionsSpec partitionSpec;
+
+    if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+         || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+      List<String> partitionDimensions = ((DimensionRangeShardSpec) 
shardSpec).getDimensions();
+      partitionSpec = new DimensionRangePartitionsSpec(
+          task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
+          null,
+          partitionDimensions,
+          false
+      );
+
+    } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
+      partitionSpec = new 
DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
 null);
+    } else {
+      log.error(
+          "Query [%s] skipping storing compaction state in segments as shard 
spec of unsupported type [%s].",
+          queryId, shardSpec.getType()
+      );
+      return Function.identity();
     }
+
+    Granularity segmentGranularity = ((DataSourceMSQDestination) 
task.getQuerySpec()
+                                                                     
.getDestination()).getSegmentGranularity();
+
+    GranularitySpec granularitySpec = new UniformGranularitySpec(
+        segmentGranularity,
+        dataSchema.getGranularitySpec().getQueryGranularity(),
+        dataSchema.getGranularitySpec().isRollup(),
+        dataSchema.getGranularitySpec().inputIntervals()
+    );
+
+    DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
+    Map<String, Object> transformSpec = 
TransformSpec.NONE.equals(dataSchema.getTransformSpec())
+                                        ? null
+                                        : new 
ClientCompactionTaskTransformSpec(dataSchema.getTransformSpec()
+                                                                               
           .getFilter()).asMap(
+                                            jsonMapper);

Review Comment:
   Please fix the formatting here.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -60,14 +75,18 @@
 public class MSQReplaceTest extends MSQTestBase
 {
 
-  private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK";
-  private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK =
+  private static final String WITH_REPLACE_LOCK_AND_COMPACTION_STATE = 
"with_replace_lock_and_compaction_state";
+  private static final Map<String, Object> 
QUERY_CONTEXT_WITH_REPLACE_LOCK_AND_COMPACTION_STATE =
       ImmutableMap.<String, Object>builder()
                   .putAll(DEFAULT_MSQ_CONTEXT)
                   .put(
                       Tasks.TASK_LOCK_TYPE,
                       StringUtils.toLowerCase(TaskLockType.REPLACE.name())
                   )
+                  .put(
+                      Tasks.STORE_COMPACTION_STATE_KEY,
+                      true
+                  )

Review Comment:
   ```suggestion
                     .put(Tasks.STORE_COMPACTION_STATE_KEY, true)
   ```



##########
processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java:
##########
@@ -361,10 +364,48 @@ public void testWithLastCompactionState()
                                            
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
                                            .shardSpec(getShardSpec(7))
                                            .size(0)
-                                           .build();
+                                            .build();
     Assert.assertEquals(segment1, 
segment2.withLastCompactionState(compactionState));
   }
 
+  @Test
+  public void testAnnotateWithLastCompactionState()
+  {
+    final CompactionState compactionState = new CompactionState(
+        new DynamicPartitionsSpec(null, null),
+        new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", 
"foo"))),
+        ImmutableList.of(ImmutableMap.of("type", "count", "name", "count")),
+        ImmutableMap.of("filter", ImmutableMap.of("type", "selector", 
"dimension", "dim1", "value", "foo")),
+        Collections.singletonMap("test", "map"),
+        Collections.singletonMap("test2", "map2")
+    );
+
+    final Function<Set<DataSegment>, Set<DataSegment>> annotateFn = 
CompactionState.compactionStateAnnotateFunction(
+        new DynamicPartitionsSpec(null, null),
+        new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", 
"foo"))),
+        ImmutableList.of(ImmutableMap.of("type", "count", "name", "count")),
+        ImmutableMap.of("filter", ImmutableMap.of("type", "selector", 
"dimension", "dim1", "value", "foo")),

Review Comment:
   Assign these values to variables and reuse the variables in the two places 
for clarity that the two are actually the same.



##########
processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java:
##########
@@ -361,10 +364,48 @@ public void testWithLastCompactionState()
                                            
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
                                            .shardSpec(getShardSpec(7))
                                            .size(0)
-                                           .build();
+                                            .build();
     Assert.assertEquals(segment1, 
segment2.withLastCompactionState(compactionState));
   }
 
+  @Test
+  public void testAnnotateWithLastCompactionState()

Review Comment:
   Thanks for adding this test!



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -853,6 +950,15 @@ public void testReplaceAllOverEternitySegment(String 
contextName, Map<String, Ob
                              new Object[]{946771200000L, 2.0f}
                          )
                      )
+                     .setExpectedLastCompactionState(
+                         expectedCompactionState(
+                             context,
+                             Collections.emptyList(),
+                             Collections.singletonList(new 
FloatDimensionSchema(
+                                 "m1")),

Review Comment:
   ```suggestion
                                Collections.singletonList(new 
FloatDimensionSchema("m1")),
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
       //noinspection unchecked
       @SuppressWarnings("unchecked")
       final Set<DataSegment> segments = (Set<DataSegment>) 
queryKernel.getResultObjectForStage(finalStageId);
+
+      Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction = Function.identity();
+
+      boolean storeCompactionState = 
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+                                                 .getBoolean(
+                                                     
Tasks.STORE_COMPACTION_STATE_KEY,
+                                                     
Tasks.DEFAULT_STORE_COMPACTION_STATE
+                                                 );
+
+      if (!segments.isEmpty() && storeCompactionState) {
+
+        DataSourceMSQDestination destination = (DataSourceMSQDestination) 
task.getQuerySpec().getDestination();
+        if (!destination.isReplaceTimeChunks()) {
+          // Only do this for replace queries, whether originating directly or 
via compaction
+          log.error("storeCompactionState flag set for a non-REPLACE query 
[%s]", queryDef.getQueryId());
+        } else {
+
+          DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) 
queryKernel
+              
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+          ShardSpec shardSpec = 
segments.stream().findFirst().get().getShardSpec();
+
+          compactionStateAnnotateFunction = 
prepareCompactionStateAnnotateFunction(
+              task(),
+              context.jsonMapper(),
+              dataSchema,
+              shardSpec,
+              queryDef.getQueryId()
+          );
+        }
+      }
+
       log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), 
segments.size());
-      publishAllSegments(segments);
+      publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+    }
+  }
+
+  public static Function<Set<DataSegment>, Set<DataSegment>> 
prepareCompactionStateAnnotateFunction(
+      MSQControllerTask task,
+      ObjectMapper jsonMapper,
+      DataSchema dataSchema,
+      ShardSpec shardSpec,
+      String queryId
+  )
+  {
+    PartitionsSpec partitionSpec;
+
+    if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+         || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+      List<String> partitionDimensions = ((DimensionRangeShardSpec) 
shardSpec).getDimensions();
+      partitionSpec = new DimensionRangePartitionsSpec(

Review Comment:
   Since we are using `DimensionRangePartitionsSpec` for single-dim segments, 
is it possible that segments partitioned by single-dim would get re-picked by 
compaction if compaction config has `single` as the desired state?
   I am not entirely sure if we still allow users to use `single` in the 
compaction config.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
       //noinspection unchecked
       @SuppressWarnings("unchecked")
       final Set<DataSegment> segments = (Set<DataSegment>) 
queryKernel.getResultObjectForStage(finalStageId);
+
+      Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction = Function.identity();
+
+      boolean storeCompactionState = 
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+                                                 .getBoolean(
+                                                     
Tasks.STORE_COMPACTION_STATE_KEY,
+                                                     
Tasks.DEFAULT_STORE_COMPACTION_STATE
+                                                 );
+
+      if (!segments.isEmpty() && storeCompactionState) {
+
+        DataSourceMSQDestination destination = (DataSourceMSQDestination) 
task.getQuerySpec().getDestination();
+        if (!destination.isReplaceTimeChunks()) {
+          // Only do this for replace queries, whether originating directly or 
via compaction
+          log.error("storeCompactionState flag set for a non-REPLACE query 
[%s]", queryDef.getQueryId());
+        } else {
+
+          DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) 
queryKernel
+              
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+          ShardSpec shardSpec = 
segments.stream().findFirst().get().getShardSpec();
+
+          compactionStateAnnotateFunction = 
prepareCompactionStateAnnotateFunction(
+              task(),
+              context.jsonMapper(),
+              dataSchema,
+              shardSpec,
+              queryDef.getQueryId()
+          );
+        }
+      }
+
       log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), 
segments.size());
-      publishAllSegments(segments);
+      publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+    }
+  }
+
+  public static Function<Set<DataSegment>, Set<DataSegment>> 
prepareCompactionStateAnnotateFunction(
+      MSQControllerTask task,
+      ObjectMapper jsonMapper,
+      DataSchema dataSchema,
+      ShardSpec shardSpec,
+      String queryId
+  )
+  {
+    PartitionsSpec partitionSpec;
+
+    if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+         || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+      List<String> partitionDimensions = ((DimensionRangeShardSpec) 
shardSpec).getDimensions();
+      partitionSpec = new DimensionRangePartitionsSpec(
+          task.getQuerySpec().getTuningConfig().getRowsPerSegment(),

Review Comment:
   Assign `task.getQuerySpec().getTuningConfig()` to some variable as it is 
used in several places. That would help with the readability of this whole 
method.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -948,6 +1065,70 @@ public void testReplaceSegmentsInsertIntoNewTable(String 
contextName, Map<String
                              new Object[]{978480000000L, 6.0f}
                          )
                      )
+                     .setExpectedLastCompactionState(
+                         expectedCompactionState(
+                             context,
+                             Collections.emptyList(),
+                             Collections.singletonList(new 
FloatDimensionSchema("m1")),
+                             GranularityType.ALL
+                         )
+                     )
+                     .verifyResults();
+  }
+
+  @MethodSource("data")
+  @ParameterizedTest(name = "{index}:with context {0}")
+  public void testReplaceSegmentsWithQuarterSegmentGranularity(String 
contextName, Map<String, Object> context)
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("m1", ColumnType.FLOAT)
+                                            .add("m2", ColumnType.DOUBLE)
+                                            .build();
+
+    testIngestQuery().setSql(" REPLACE INTO foobar "
+                             + "OVERWRITE ALL "
+                             + "SELECT __time, m1, m2 "
+                             + "FROM foo "
+                             + "PARTITIONED by TIME_FLOOR(__time, 'P3M') ")
+                     .setExpectedDataSource("foobar")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+                                                             "foobar",
+                                                             Intervals.of(
+                                                                 
"2000-01-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"),
+                                                             "test",
+                                                             0
+                                                         ),
+                                                         SegmentId.of(
+                                                             "foobar",
+                                                             Intervals.of(
+                                                                 
"2001-01-01T00:00:00.000Z/2001-04-01T00:00:00.000Z"),
+                                                             "test",
+                                                             0
+                                                         )
+                                         )
+                     )

Review Comment:
   Formatting:
   ```suggestion
                        .setExpectedSegment(
                            ImmutableSet.of(
                                SegmentId.of(
                                       "foobar",
                                       
Intervals.of("2000-01-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"),
                                       "test",
                                        0
                                 ),
                                 
   ```
   and so on.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -1641,4 +1846,48 @@ private List<Object[]> expectedFooRows()
     ));
     return expectedRows;
   }
+
+  private CompactionState expectedCompactionState(
+      Map<String, Object> context, List<String> partitionDimensions, 
List<DimensionSchema> dimensions,

Review Comment:
   Cleaner to have each arg on a separate line.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java:
##########
@@ -583,7 +583,7 @@ public static boolean isGuaranteedRollup(
     return tuningConfig.isForceGuaranteedRollup();
   }
 
-  public static Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction(
+  public static Function<Set<DataSegment>, Set<DataSegment>> 
prepareCompactionStateAnnotateFunction(

Review Comment:
   Nit: Why is this rename needed?
   This method also returns a `Function` same as 
`CompactionState.compactionStateAnnotateFunction`.
   
   Either way, I think the name of these functions should clarify what they do 
i.e. `addCompactionStateToSegments`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to