kfaraz commented on code in PR #15965: URL: https://github.com/apache/druid/pull/15965#discussion_r1546076578
########## processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java: ########## @@ -361,10 +364,48 @@ public void testWithLastCompactionState() .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) .shardSpec(getShardSpec(7)) .size(0) - .build(); + .build(); Review Comment: Nit: unneeded change. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1780,7 +1891,8 @@ private static QueryDefinition makeQueryDefinition( } } else { shuffleSpecFactory = querySpec.getDestination() - .getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())); + .getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery() + .context())); Review Comment: Nit: style ```suggestion .getShuffleSpecFactory( MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()) ); ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded( //noinspection unchecked @SuppressWarnings("unchecked") final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); + + Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); + + boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext()) + .getBoolean( + Tasks.STORE_COMPACTION_STATE_KEY, + Tasks.DEFAULT_STORE_COMPACTION_STATE + ); + + if (!segments.isEmpty() && storeCompactionState) { + + DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); + if (!destination.isReplaceTimeChunks()) { + // Only do this for replace queries, whether originating directly or via compaction + log.error("storeCompactionState flag set for a non-REPLACE query [%s]", queryDef.getQueryId()); + } else { + + DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel + .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); + + ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + + compactionStateAnnotateFunction = prepareCompactionStateAnnotateFunction( + task(), + context.jsonMapper(), + dataSchema, + shardSpec, + queryDef.getQueryId() + ); + } + } + log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(compactionStateAnnotateFunction.apply(segments)); + } + } + + public static Function<Set<DataSegment>, Set<DataSegment>> prepareCompactionStateAnnotateFunction( Review Comment: Why is this public? Is it used in a test anywhere? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded( //noinspection unchecked @SuppressWarnings("unchecked") final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); + + Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); + + boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext()) + .getBoolean( + Tasks.STORE_COMPACTION_STATE_KEY, + Tasks.DEFAULT_STORE_COMPACTION_STATE + ); + + if (!segments.isEmpty() && storeCompactionState) { + + DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); + if (!destination.isReplaceTimeChunks()) { + // Only do this for replace queries, whether originating directly or via compaction + log.error("storeCompactionState flag set for a non-REPLACE query [%s]", queryDef.getQueryId()); + } else { + + DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel + .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); + + ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + + compactionStateAnnotateFunction = prepareCompactionStateAnnotateFunction( + task(), + context.jsonMapper(), + dataSchema, + shardSpec, + queryDef.getQueryId() + ); + } + } + log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(compactionStateAnnotateFunction.apply(segments)); + } + } + + public static Function<Set<DataSegment>, Set<DataSegment>> prepareCompactionStateAnnotateFunction( + MSQControllerTask task, + ObjectMapper jsonMapper, + DataSchema dataSchema, + ShardSpec shardSpec, + String queryId + ) + { + PartitionsSpec partitionSpec; + + if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE) + || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) { + List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + partitionSpec = new DimensionRangePartitionsSpec( + task.getQuerySpec().getTuningConfig().getRowsPerSegment(), + null, + partitionDimensions, + false + ); + + } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { + partitionSpec = new DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(), null); + } else { + log.error( + "Query [%s] skipping storing compaction state in segments as shard spec of unsupported type [%s].", + queryId, shardSpec.getType() + ); + return Function.identity(); } + + Granularity segmentGranularity = ((DataSourceMSQDestination) task.getQuerySpec() + .getDestination()).getSegmentGranularity(); + + GranularitySpec granularitySpec = new UniformGranularitySpec( + segmentGranularity, + dataSchema.getGranularitySpec().getQueryGranularity(), + dataSchema.getGranularitySpec().isRollup(), + dataSchema.getGranularitySpec().inputIntervals() + ); + + DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec(); + Map<String, Object> transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec()) + ? null + : new ClientCompactionTaskTransformSpec(dataSchema.getTransformSpec() + .getFilter()).asMap( + jsonMapper); + List<Object> metricsSpec = dataSchema.getAggregators() == null + ? null + : jsonMapper.convertValue( + dataSchema.getAggregators(), new TypeReference<List<Object>>() + { + }); + + + IndexSpec indexSpec = task.getQuerySpec().getTuningConfig().getIndexSpec(); + + log.info("Query [%s] storing compaction state in segments.", queryId); Review Comment: ```suggestion log.info("Query[%s] storing compaction state in segments.", queryId); ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded( //noinspection unchecked @SuppressWarnings("unchecked") final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); + + Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); + + boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext()) + .getBoolean( + Tasks.STORE_COMPACTION_STATE_KEY, + Tasks.DEFAULT_STORE_COMPACTION_STATE + ); + + if (!segments.isEmpty() && storeCompactionState) { + + DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); + if (!destination.isReplaceTimeChunks()) { + // Only do this for replace queries, whether originating directly or via compaction + log.error("storeCompactionState flag set for a non-REPLACE query [%s]", queryDef.getQueryId()); + } else { + + DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel + .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); + + ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + + compactionStateAnnotateFunction = prepareCompactionStateAnnotateFunction( + task(), + context.jsonMapper(), + dataSchema, + shardSpec, + queryDef.getQueryId() + ); + } + } + log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(compactionStateAnnotateFunction.apply(segments)); + } + } + + public static Function<Set<DataSegment>, Set<DataSegment>> prepareCompactionStateAnnotateFunction( + MSQControllerTask task, + ObjectMapper jsonMapper, + DataSchema dataSchema, + ShardSpec shardSpec, + String queryId + ) + { + PartitionsSpec partitionSpec; + + if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE) + || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) { + List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + partitionSpec = new DimensionRangePartitionsSpec( + task.getQuerySpec().getTuningConfig().getRowsPerSegment(), + null, + partitionDimensions, + false + ); + + } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { + partitionSpec = new DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(), null); + } else { + log.error( + "Query [%s] skipping storing compaction state in segments as shard spec of unsupported type [%s].", + queryId, shardSpec.getType() + ); + return Function.identity(); } + + Granularity segmentGranularity = ((DataSourceMSQDestination) task.getQuerySpec() + .getDestination()).getSegmentGranularity(); + + GranularitySpec granularitySpec = new UniformGranularitySpec( + segmentGranularity, + dataSchema.getGranularitySpec().getQueryGranularity(), + dataSchema.getGranularitySpec().isRollup(), + dataSchema.getGranularitySpec().inputIntervals() + ); + + DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec(); + Map<String, Object> transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec()) + ? null + : new ClientCompactionTaskTransformSpec(dataSchema.getTransformSpec() + .getFilter()).asMap( + jsonMapper); Review Comment: Please fix the formatting here. ########## extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java: ########## @@ -60,14 +75,18 @@ public class MSQReplaceTest extends MSQTestBase { - private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK"; - private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK = + private static final String WITH_REPLACE_LOCK_AND_COMPACTION_STATE = "with_replace_lock_and_compaction_state"; + private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK_AND_COMPACTION_STATE = ImmutableMap.<String, Object>builder() .putAll(DEFAULT_MSQ_CONTEXT) .put( Tasks.TASK_LOCK_TYPE, StringUtils.toLowerCase(TaskLockType.REPLACE.name()) ) + .put( + Tasks.STORE_COMPACTION_STATE_KEY, + true + ) Review Comment: ```suggestion .put(Tasks.STORE_COMPACTION_STATE_KEY, true) ``` ########## processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java: ########## @@ -361,10 +364,48 @@ public void testWithLastCompactionState() .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) .shardSpec(getShardSpec(7)) .size(0) - .build(); + .build(); Assert.assertEquals(segment1, segment2.withLastCompactionState(compactionState)); } + @Test + public void testAnnotateWithLastCompactionState() + { + final CompactionState compactionState = new CompactionState( + new DynamicPartitionsSpec(null, null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ImmutableList.of(ImmutableMap.of("type", "count", "name", "count")), + ImmutableMap.of("filter", ImmutableMap.of("type", "selector", "dimension", "dim1", "value", "foo")), + Collections.singletonMap("test", "map"), + Collections.singletonMap("test2", "map2") + ); + + final Function<Set<DataSegment>, Set<DataSegment>> annotateFn = CompactionState.compactionStateAnnotateFunction( + new DynamicPartitionsSpec(null, null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ImmutableList.of(ImmutableMap.of("type", "count", "name", "count")), + ImmutableMap.of("filter", ImmutableMap.of("type", "selector", "dimension", "dim1", "value", "foo")), Review Comment: Assign these values to variables and reuse the variables in the two places for clarity that the two are actually the same. ########## processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java: ########## @@ -361,10 +364,48 @@ public void testWithLastCompactionState() .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) .shardSpec(getShardSpec(7)) .size(0) - .build(); + .build(); Assert.assertEquals(segment1, segment2.withLastCompactionState(compactionState)); } + @Test + public void testAnnotateWithLastCompactionState() Review Comment: Thanks for adding this test! ########## extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java: ########## @@ -853,6 +950,15 @@ public void testReplaceAllOverEternitySegment(String contextName, Map<String, Ob new Object[]{946771200000L, 2.0f} ) ) + .setExpectedLastCompactionState( + expectedCompactionState( + context, + Collections.emptyList(), + Collections.singletonList(new FloatDimensionSchema( + "m1")), Review Comment: ```suggestion Collections.singletonList(new FloatDimensionSchema("m1")), ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded( //noinspection unchecked @SuppressWarnings("unchecked") final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); + + Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); + + boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext()) + .getBoolean( + Tasks.STORE_COMPACTION_STATE_KEY, + Tasks.DEFAULT_STORE_COMPACTION_STATE + ); + + if (!segments.isEmpty() && storeCompactionState) { + + DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); + if (!destination.isReplaceTimeChunks()) { + // Only do this for replace queries, whether originating directly or via compaction + log.error("storeCompactionState flag set for a non-REPLACE query [%s]", queryDef.getQueryId()); + } else { + + DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel + .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); + + ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + + compactionStateAnnotateFunction = prepareCompactionStateAnnotateFunction( + task(), + context.jsonMapper(), + dataSchema, + shardSpec, + queryDef.getQueryId() + ); + } + } + log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(compactionStateAnnotateFunction.apply(segments)); + } + } + + public static Function<Set<DataSegment>, Set<DataSegment>> prepareCompactionStateAnnotateFunction( + MSQControllerTask task, + ObjectMapper jsonMapper, + DataSchema dataSchema, + ShardSpec shardSpec, + String queryId + ) + { + PartitionsSpec partitionSpec; + + if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE) + || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) { + List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + partitionSpec = new DimensionRangePartitionsSpec( Review Comment: Since we are using `DimensionRangePartitionsSpec` for single-dim segments, is it possible that segments partitioned by single-dim would get re-picked by compaction if compaction config has `single` as the desired state? I am not entirely sure if we still allow users to use `single` in the compaction config. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded( //noinspection unchecked @SuppressWarnings("unchecked") final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); + + Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); + + boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext()) + .getBoolean( + Tasks.STORE_COMPACTION_STATE_KEY, + Tasks.DEFAULT_STORE_COMPACTION_STATE + ); + + if (!segments.isEmpty() && storeCompactionState) { + + DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); + if (!destination.isReplaceTimeChunks()) { + // Only do this for replace queries, whether originating directly or via compaction + log.error("storeCompactionState flag set for a non-REPLACE query [%s]", queryDef.getQueryId()); + } else { + + DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel + .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); + + ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + + compactionStateAnnotateFunction = prepareCompactionStateAnnotateFunction( + task(), + context.jsonMapper(), + dataSchema, + shardSpec, + queryDef.getQueryId() + ); + } + } + log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(compactionStateAnnotateFunction.apply(segments)); + } + } + + public static Function<Set<DataSegment>, Set<DataSegment>> prepareCompactionStateAnnotateFunction( + MSQControllerTask task, + ObjectMapper jsonMapper, + DataSchema dataSchema, + ShardSpec shardSpec, + String queryId + ) + { + PartitionsSpec partitionSpec; + + if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE) + || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) { + List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + partitionSpec = new DimensionRangePartitionsSpec( + task.getQuerySpec().getTuningConfig().getRowsPerSegment(), Review Comment: Assign `task.getQuerySpec().getTuningConfig()` to some variable as it is used in several places. That would help with the readability of this whole method. ########## extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java: ########## @@ -948,6 +1065,70 @@ public void testReplaceSegmentsInsertIntoNewTable(String contextName, Map<String new Object[]{978480000000L, 6.0f} ) ) + .setExpectedLastCompactionState( + expectedCompactionState( + context, + Collections.emptyList(), + Collections.singletonList(new FloatDimensionSchema("m1")), + GranularityType.ALL + ) + ) + .verifyResults(); + } + + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testReplaceSegmentsWithQuarterSegmentGranularity(String contextName, Map<String, Object> context) + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("m1", ColumnType.FLOAT) + .add("m2", ColumnType.DOUBLE) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foobar " + + "OVERWRITE ALL " + + "SELECT __time, m1, m2 " + + "FROM foo " + + "PARTITIONED by TIME_FLOOR(__time, 'P3M') ") + .setExpectedDataSource("foobar") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foobar", + Intervals.of( + "2000-01-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"), + "test", + 0 + ), + SegmentId.of( + "foobar", + Intervals.of( + "2001-01-01T00:00:00.000Z/2001-04-01T00:00:00.000Z"), + "test", + 0 + ) + ) + ) Review Comment: Formatting: ```suggestion .setExpectedSegment( ImmutableSet.of( SegmentId.of( "foobar", Intervals.of("2000-01-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"), "test", 0 ), ``` and so on. ########## extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java: ########## @@ -1641,4 +1846,48 @@ private List<Object[]> expectedFooRows() )); return expectedRows; } + + private CompactionState expectedCompactionState( + Map<String, Object> context, List<String> partitionDimensions, List<DimensionSchema> dimensions, Review Comment: Cleaner to have each arg on a separate line. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java: ########## @@ -583,7 +583,7 @@ public static boolean isGuaranteedRollup( return tuningConfig.isForceGuaranteedRollup(); } - public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction( + public static Function<Set<DataSegment>, Set<DataSegment>> prepareCompactionStateAnnotateFunction( Review Comment: Nit: Why is this rename needed? This method also returns a `Function` same as `CompactionState.compactionStateAnnotateFunction`. Either way, I think the name of these functions should clarify what they do i.e. `addCompactionStateToSegments` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org