kfaraz commented on code in PR #16778:
URL: https://github.com/apache/druid/pull/16778#discussion_r1687436379
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -671,7 +671,10 @@ private static DataSchema createDataSchema(
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,
- transformSpec == null ? null : new
TransformSpec(transformSpec.getFilter(), null)
+ transformSpec == null ? null : new
TransformSpec(transformSpec.getFilter(), null),
+ existingSegmentAnalyzer.getHasRolledUpSegments(),
+ null,
+ null
Review Comment:
Should we pass a non-null `ObjectMapper` here?
The logic in `DataSchema` is a little involved and I am not sure if the
`ObjectMapper` will ever be used but better to be on the safe side than have to
deal with an NPE that we can't decipher.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -904,6 +913,7 @@ private void processRollup(final QueryableIndex index)
// Pick rollup value if all segments being compacted have the same,
non-null, value otherwise set it to false
final Boolean isIndexRollup = index.getMetadata().isRollup();
rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
+ hasRolledUpSegments = hasRolledUpSegments ||
Boolean.valueOf(true).equals(isIndexRollup);
Review Comment:
```suggestion
hasRolledUpSegments = hasRolledUpSegments ||
Boolean.TRUE.equals(isIndexRollup);
```
##########
server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java:
##########
@@ -123,6 +126,30 @@ public DataSchema(
}
}
+ public DataSchema(
+ @JsonProperty("dataSource") String dataSource,
Review Comment:
There should be only one constructor with `@JsonProperty` annotations.
Also, is it possible to use one of the existing constructors instead of
adding a new one?
##########
server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java:
##########
@@ -83,6 +84,7 @@ public DataSchema(
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("transformSpec") TransformSpec transformSpec,
+ @JsonProperty("hasRolledUpSegments") @Nullable Boolean
hasRolledUpSegments,
Review Comment:
`isRolledUp` would probably be a better name. What do you think?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -815,6 +819,11 @@ public Boolean getRollup()
return rollup;
}
+ public Boolean getHasRolledUpSegments()
Review Comment:
```suggestion
public boolean hasRolledUpSegments()
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -469,6 +526,8 @@ private Map<String, Object>
createMSQTaskContext(CompactionTask compactionTask,
}
// Similar to compaction using the native engine, don't finalize
aggregations.
context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS,
false);
+ // Add appropriate finalization to native query context.
+ context.put(QueryContexts.FINALIZE_KEY, false);
Review Comment:
What is the effect of adding this? Maybe update the comment to explain why
this line is needed.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -159,6 +161,41 @@ public TaskStatus runCompactionTasks(
TaskToolbox taskToolbox
) throws Exception
{
+ for (Map.Entry<Interval, DataSchema> intervalDataSchema :
intervalDataSchemas.entrySet()) {
+ if
(Boolean.valueOf(true).equals(intervalDataSchema.getValue().getHasRolledUpSegments()))
{
+ for (AggregatorFactory aggregatorFactory :
intervalDataSchema.getValue().getAggregators()) {
+ // Don't proceed if either:
Review Comment:
This whole check should be performed as part of the
`validateCompactionTask()` invocation.
If needed, you can update the signature of `validateCompactionTask()` to
accept the data schemas.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -159,6 +161,41 @@ public TaskStatus runCompactionTasks(
TaskToolbox taskToolbox
) throws Exception
{
+ for (Map.Entry<Interval, DataSchema> intervalDataSchema :
intervalDataSchemas.entrySet()) {
+ if
(Boolean.valueOf(true).equals(intervalDataSchema.getValue().getHasRolledUpSegments()))
{
+ for (AggregatorFactory aggregatorFactory :
intervalDataSchema.getValue().getAggregators()) {
+ // Don't proceed if either:
+ // - aggregator factory differs from its combining factory
+ // - input col name is different from the output name (idempotent)
Review Comment:
If we still have this check, why have we removed it from
`ClientCompactionRunnerInfo`?
##########
integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java:
##########
@@ -1829,8 +1835,9 @@ private void submitCompactionConfig(
transformSpec,
!dropExisting ? null : new UserCompactionTaskIOConfig(true),
engine,
- null
+ ImmutableMap.of("maxNumTasks", 2)
);
+ LOG.info("Submitting compaction config for engine[%s].", engine == null ?
CompactionEngine.NATIVE: engine);
Review Comment:
Is this still needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]