kfaraz commented on code in PR #15965:
URL: https://github.com/apache/druid/pull/15965#discussion_r1546076578
##########
processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java:
##########
@@ -361,10 +364,48 @@ public void testWithLastCompactionState()
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(getShardSpec(7))
.size(0)
- .build();
+ .build();
Review Comment:
Nit: unneeded change.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1780,7 +1891,8 @@ private static QueryDefinition makeQueryDefinition(
}
} else {
shuffleSpecFactory = querySpec.getDestination()
-
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));
+
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery()
+
.context()));
Review Comment:
Nit: style
```suggestion
.getShuffleSpecFactory(
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
);
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>)
queryKernel.getResultObjectForStage(finalStageId);
+
+ Function<Set<DataSegment>, Set<DataSegment>>
compactionStateAnnotateFunction = Function.identity();
+
+ boolean storeCompactionState =
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+ .getBoolean(
+
Tasks.STORE_COMPACTION_STATE_KEY,
+
Tasks.DEFAULT_STORE_COMPACTION_STATE
+ );
+
+ if (!segments.isEmpty() && storeCompactionState) {
+
+ DataSourceMSQDestination destination = (DataSourceMSQDestination)
task.getQuerySpec().getDestination();
+ if (!destination.isReplaceTimeChunks()) {
+ // Only do this for replace queries, whether originating directly or
via compaction
+ log.error("storeCompactionState flag set for a non-REPLACE query
[%s]", queryDef.getQueryId());
+ } else {
+
+ DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory)
queryKernel
+
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+ ShardSpec shardSpec =
segments.stream().findFirst().get().getShardSpec();
+
+ compactionStateAnnotateFunction =
prepareCompactionStateAnnotateFunction(
+ task(),
+ context.jsonMapper(),
+ dataSchema,
+ shardSpec,
+ queryDef.getQueryId()
+ );
+ }
+ }
+
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(),
segments.size());
- publishAllSegments(segments);
+ publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+ }
+ }
+
+ public static Function<Set<DataSegment>, Set<DataSegment>>
prepareCompactionStateAnnotateFunction(
Review Comment:
Why is this public? Is it used in a test anywhere?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>)
queryKernel.getResultObjectForStage(finalStageId);
+
+ Function<Set<DataSegment>, Set<DataSegment>>
compactionStateAnnotateFunction = Function.identity();
+
+ boolean storeCompactionState =
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+ .getBoolean(
+
Tasks.STORE_COMPACTION_STATE_KEY,
+
Tasks.DEFAULT_STORE_COMPACTION_STATE
+ );
+
+ if (!segments.isEmpty() && storeCompactionState) {
+
+ DataSourceMSQDestination destination = (DataSourceMSQDestination)
task.getQuerySpec().getDestination();
+ if (!destination.isReplaceTimeChunks()) {
+ // Only do this for replace queries, whether originating directly or
via compaction
+ log.error("storeCompactionState flag set for a non-REPLACE query
[%s]", queryDef.getQueryId());
+ } else {
+
+ DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory)
queryKernel
+
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+ ShardSpec shardSpec =
segments.stream().findFirst().get().getShardSpec();
+
+ compactionStateAnnotateFunction =
prepareCompactionStateAnnotateFunction(
+ task(),
+ context.jsonMapper(),
+ dataSchema,
+ shardSpec,
+ queryDef.getQueryId()
+ );
+ }
+ }
+
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(),
segments.size());
- publishAllSegments(segments);
+ publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+ }
+ }
+
+ public static Function<Set<DataSegment>, Set<DataSegment>>
prepareCompactionStateAnnotateFunction(
+ MSQControllerTask task,
+ ObjectMapper jsonMapper,
+ DataSchema dataSchema,
+ ShardSpec shardSpec,
+ String queryId
+ )
+ {
+ PartitionsSpec partitionSpec;
+
+ if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+ || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+ List<String> partitionDimensions = ((DimensionRangeShardSpec)
shardSpec).getDimensions();
+ partitionSpec = new DimensionRangePartitionsSpec(
+ task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
+ null,
+ partitionDimensions,
+ false
+ );
+
+ } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
+ partitionSpec = new
DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
null);
+ } else {
+ log.error(
+ "Query [%s] skipping storing compaction state in segments as shard
spec of unsupported type [%s].",
+ queryId, shardSpec.getType()
+ );
+ return Function.identity();
}
+
+ Granularity segmentGranularity = ((DataSourceMSQDestination)
task.getQuerySpec()
+
.getDestination()).getSegmentGranularity();
+
+ GranularitySpec granularitySpec = new UniformGranularitySpec(
+ segmentGranularity,
+ dataSchema.getGranularitySpec().getQueryGranularity(),
+ dataSchema.getGranularitySpec().isRollup(),
+ dataSchema.getGranularitySpec().inputIntervals()
+ );
+
+ DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
+ Map<String, Object> transformSpec =
TransformSpec.NONE.equals(dataSchema.getTransformSpec())
+ ? null
+ : new
ClientCompactionTaskTransformSpec(dataSchema.getTransformSpec()
+
.getFilter()).asMap(
+ jsonMapper);
+ List<Object> metricsSpec = dataSchema.getAggregators() == null
+ ? null
+ : jsonMapper.convertValue(
+ dataSchema.getAggregators(), new
TypeReference<List<Object>>()
+ {
+ });
+
+
+ IndexSpec indexSpec = task.getQuerySpec().getTuningConfig().getIndexSpec();
+
+ log.info("Query [%s] storing compaction state in segments.", queryId);
Review Comment:
```suggestion
log.info("Query[%s] storing compaction state in segments.", queryId);
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>)
queryKernel.getResultObjectForStage(finalStageId);
+
+ Function<Set<DataSegment>, Set<DataSegment>>
compactionStateAnnotateFunction = Function.identity();
+
+ boolean storeCompactionState =
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+ .getBoolean(
+
Tasks.STORE_COMPACTION_STATE_KEY,
+
Tasks.DEFAULT_STORE_COMPACTION_STATE
+ );
+
+ if (!segments.isEmpty() && storeCompactionState) {
+
+ DataSourceMSQDestination destination = (DataSourceMSQDestination)
task.getQuerySpec().getDestination();
+ if (!destination.isReplaceTimeChunks()) {
+ // Only do this for replace queries, whether originating directly or
via compaction
+ log.error("storeCompactionState flag set for a non-REPLACE query
[%s]", queryDef.getQueryId());
+ } else {
+
+ DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory)
queryKernel
+
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+ ShardSpec shardSpec =
segments.stream().findFirst().get().getShardSpec();
+
+ compactionStateAnnotateFunction =
prepareCompactionStateAnnotateFunction(
+ task(),
+ context.jsonMapper(),
+ dataSchema,
+ shardSpec,
+ queryDef.getQueryId()
+ );
+ }
+ }
+
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(),
segments.size());
- publishAllSegments(segments);
+ publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+ }
+ }
+
+ public static Function<Set<DataSegment>, Set<DataSegment>>
prepareCompactionStateAnnotateFunction(
+ MSQControllerTask task,
+ ObjectMapper jsonMapper,
+ DataSchema dataSchema,
+ ShardSpec shardSpec,
+ String queryId
+ )
+ {
+ PartitionsSpec partitionSpec;
+
+ if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+ || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+ List<String> partitionDimensions = ((DimensionRangeShardSpec)
shardSpec).getDimensions();
+ partitionSpec = new DimensionRangePartitionsSpec(
+ task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
+ null,
+ partitionDimensions,
+ false
+ );
+
+ } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
+ partitionSpec = new
DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
null);
+ } else {
+ log.error(
+ "Query [%s] skipping storing compaction state in segments as shard
spec of unsupported type [%s].",
+ queryId, shardSpec.getType()
+ );
+ return Function.identity();
}
+
+ Granularity segmentGranularity = ((DataSourceMSQDestination)
task.getQuerySpec()
+
.getDestination()).getSegmentGranularity();
+
+ GranularitySpec granularitySpec = new UniformGranularitySpec(
+ segmentGranularity,
+ dataSchema.getGranularitySpec().getQueryGranularity(),
+ dataSchema.getGranularitySpec().isRollup(),
+ dataSchema.getGranularitySpec().inputIntervals()
+ );
+
+ DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
+ Map<String, Object> transformSpec =
TransformSpec.NONE.equals(dataSchema.getTransformSpec())
+ ? null
+ : new
ClientCompactionTaskTransformSpec(dataSchema.getTransformSpec()
+
.getFilter()).asMap(
+ jsonMapper);
Review Comment:
Please fix the formatting here.
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -60,14 +75,18 @@
public class MSQReplaceTest extends MSQTestBase
{
- private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK";
- private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK =
+ private static final String WITH_REPLACE_LOCK_AND_COMPACTION_STATE =
"with_replace_lock_and_compaction_state";
+ private static final Map<String, Object>
QUERY_CONTEXT_WITH_REPLACE_LOCK_AND_COMPACTION_STATE =
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
Tasks.TASK_LOCK_TYPE,
StringUtils.toLowerCase(TaskLockType.REPLACE.name())
)
+ .put(
+ Tasks.STORE_COMPACTION_STATE_KEY,
+ true
+ )
Review Comment:
```suggestion
.put(Tasks.STORE_COMPACTION_STATE_KEY, true)
```
##########
processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java:
##########
@@ -361,10 +364,48 @@ public void testWithLastCompactionState()
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(getShardSpec(7))
.size(0)
- .build();
+ .build();
Assert.assertEquals(segment1,
segment2.withLastCompactionState(compactionState));
}
+ @Test
+ public void testAnnotateWithLastCompactionState()
+ {
+ final CompactionState compactionState = new CompactionState(
+ new DynamicPartitionsSpec(null, null),
+ new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar",
"foo"))),
+ ImmutableList.of(ImmutableMap.of("type", "count", "name", "count")),
+ ImmutableMap.of("filter", ImmutableMap.of("type", "selector",
"dimension", "dim1", "value", "foo")),
+ Collections.singletonMap("test", "map"),
+ Collections.singletonMap("test2", "map2")
+ );
+
+ final Function<Set<DataSegment>, Set<DataSegment>> annotateFn =
CompactionState.compactionStateAnnotateFunction(
+ new DynamicPartitionsSpec(null, null),
+ new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar",
"foo"))),
+ ImmutableList.of(ImmutableMap.of("type", "count", "name", "count")),
+ ImmutableMap.of("filter", ImmutableMap.of("type", "selector",
"dimension", "dim1", "value", "foo")),
Review Comment:
Assign these values to variables and reuse the variables in the two places
for clarity that the two are actually the same.
##########
processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java:
##########
@@ -361,10 +364,48 @@ public void testWithLastCompactionState()
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(getShardSpec(7))
.size(0)
- .build();
+ .build();
Assert.assertEquals(segment1,
segment2.withLastCompactionState(compactionState));
}
+ @Test
+ public void testAnnotateWithLastCompactionState()
Review Comment:
Thanks for adding this test!
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -853,6 +950,15 @@ public void testReplaceAllOverEternitySegment(String
contextName, Map<String, Ob
new Object[]{946771200000L, 2.0f}
)
)
+ .setExpectedLastCompactionState(
+ expectedCompactionState(
+ context,
+ Collections.emptyList(),
+ Collections.singletonList(new
FloatDimensionSchema(
+ "m1")),
Review Comment:
```suggestion
Collections.singletonList(new
FloatDimensionSchema("m1")),
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>)
queryKernel.getResultObjectForStage(finalStageId);
+
+ Function<Set<DataSegment>, Set<DataSegment>>
compactionStateAnnotateFunction = Function.identity();
+
+ boolean storeCompactionState =
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+ .getBoolean(
+
Tasks.STORE_COMPACTION_STATE_KEY,
+
Tasks.DEFAULT_STORE_COMPACTION_STATE
+ );
+
+ if (!segments.isEmpty() && storeCompactionState) {
+
+ DataSourceMSQDestination destination = (DataSourceMSQDestination)
task.getQuerySpec().getDestination();
+ if (!destination.isReplaceTimeChunks()) {
+ // Only do this for replace queries, whether originating directly or
via compaction
+ log.error("storeCompactionState flag set for a non-REPLACE query
[%s]", queryDef.getQueryId());
+ } else {
+
+ DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory)
queryKernel
+
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+ ShardSpec shardSpec =
segments.stream().findFirst().get().getShardSpec();
+
+ compactionStateAnnotateFunction =
prepareCompactionStateAnnotateFunction(
+ task(),
+ context.jsonMapper(),
+ dataSchema,
+ shardSpec,
+ queryDef.getQueryId()
+ );
+ }
+ }
+
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(),
segments.size());
- publishAllSegments(segments);
+ publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+ }
+ }
+
+ public static Function<Set<DataSegment>, Set<DataSegment>>
prepareCompactionStateAnnotateFunction(
+ MSQControllerTask task,
+ ObjectMapper jsonMapper,
+ DataSchema dataSchema,
+ ShardSpec shardSpec,
+ String queryId
+ )
+ {
+ PartitionsSpec partitionSpec;
+
+ if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+ || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+ List<String> partitionDimensions = ((DimensionRangeShardSpec)
shardSpec).getDimensions();
+ partitionSpec = new DimensionRangePartitionsSpec(
Review Comment:
Since we are using `DimensionRangePartitionsSpec` for single-dim segments,
is it possible that segments partitioned by single-dim would get re-picked by
compaction if compaction config has `single` as the desired state?
I am not entirely sure if we still allow users to use `single` in the
compaction config.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>)
queryKernel.getResultObjectForStage(finalStageId);
+
+ Function<Set<DataSegment>, Set<DataSegment>>
compactionStateAnnotateFunction = Function.identity();
+
+ boolean storeCompactionState =
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+ .getBoolean(
+
Tasks.STORE_COMPACTION_STATE_KEY,
+
Tasks.DEFAULT_STORE_COMPACTION_STATE
+ );
+
+ if (!segments.isEmpty() && storeCompactionState) {
+
+ DataSourceMSQDestination destination = (DataSourceMSQDestination)
task.getQuerySpec().getDestination();
+ if (!destination.isReplaceTimeChunks()) {
+ // Only do this for replace queries, whether originating directly or
via compaction
+ log.error("storeCompactionState flag set for a non-REPLACE query
[%s]", queryDef.getQueryId());
+ } else {
+
+ DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory)
queryKernel
+
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+ ShardSpec shardSpec =
segments.stream().findFirst().get().getShardSpec();
+
+ compactionStateAnnotateFunction =
prepareCompactionStateAnnotateFunction(
+ task(),
+ context.jsonMapper(),
+ dataSchema,
+ shardSpec,
+ queryDef.getQueryId()
+ );
+ }
+ }
+
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(),
segments.size());
- publishAllSegments(segments);
+ publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+ }
+ }
+
+ public static Function<Set<DataSegment>, Set<DataSegment>>
prepareCompactionStateAnnotateFunction(
+ MSQControllerTask task,
+ ObjectMapper jsonMapper,
+ DataSchema dataSchema,
+ ShardSpec shardSpec,
+ String queryId
+ )
+ {
+ PartitionsSpec partitionSpec;
+
+ if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+ || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+ List<String> partitionDimensions = ((DimensionRangeShardSpec)
shardSpec).getDimensions();
+ partitionSpec = new DimensionRangePartitionsSpec(
+ task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
Review Comment:
Assign `task.getQuerySpec().getTuningConfig()` to some variable as it is
used in several places. That would help with the readability of this whole
method.
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -948,6 +1065,70 @@ public void testReplaceSegmentsInsertIntoNewTable(String
contextName, Map<String
new Object[]{978480000000L, 6.0f}
)
)
+ .setExpectedLastCompactionState(
+ expectedCompactionState(
+ context,
+ Collections.emptyList(),
+ Collections.singletonList(new
FloatDimensionSchema("m1")),
+ GranularityType.ALL
+ )
+ )
+ .verifyResults();
+ }
+
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testReplaceSegmentsWithQuarterSegmentGranularity(String
contextName, Map<String, Object> context)
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("m1", ColumnType.FLOAT)
+ .add("m2", ColumnType.DOUBLE)
+ .build();
+
+ testIngestQuery().setSql(" REPLACE INTO foobar "
+ + "OVERWRITE ALL "
+ + "SELECT __time, m1, m2 "
+ + "FROM foo "
+ + "PARTITIONED by TIME_FLOOR(__time, 'P3M') ")
+ .setExpectedDataSource("foobar")
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+ .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+ "foobar",
+ Intervals.of(
+
"2000-01-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"),
+ "test",
+ 0
+ ),
+ SegmentId.of(
+ "foobar",
+ Intervals.of(
+
"2001-01-01T00:00:00.000Z/2001-04-01T00:00:00.000Z"),
+ "test",
+ 0
+ )
+ )
+ )
Review Comment:
Formatting:
```suggestion
.setExpectedSegment(
ImmutableSet.of(
SegmentId.of(
"foobar",
Intervals.of("2000-01-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"),
"test",
0
),
```
and so on.
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -1641,4 +1846,48 @@ private List<Object[]> expectedFooRows()
));
return expectedRows;
}
+
+ private CompactionState expectedCompactionState(
+ Map<String, Object> context, List<String> partitionDimensions,
List<DimensionSchema> dimensions,
Review Comment:
Cleaner to have each arg on a separate line.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java:
##########
@@ -583,7 +583,7 @@ public static boolean isGuaranteedRollup(
return tuningConfig.isForceGuaranteedRollup();
}
- public static Function<Set<DataSegment>, Set<DataSegment>>
compactionStateAnnotateFunction(
+ public static Function<Set<DataSegment>, Set<DataSegment>>
prepareCompactionStateAnnotateFunction(
Review Comment:
Nit: Why is this rename needed?
This method also returns a `Function` same as
`CompactionState.compactionStateAnnotateFunction`.
Either way, I think the name of these functions should clarify what they do
i.e. `addCompactionStateToSegments`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]