cryptoe commented on code in PR #15965: URL: https://github.com/apache/druid/pull/15965#discussion_r1546034348
########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded( //noinspection unchecked @SuppressWarnings("unchecked") final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); + + Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); + + boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext()) + .getBoolean( + Tasks.STORE_COMPACTION_STATE_KEY, + Tasks.DEFAULT_STORE_COMPACTION_STATE + ); + + if (!segments.isEmpty() && storeCompactionState) { + + DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); + if (!destination.isReplaceTimeChunks()) { + // Only do this for replace queries, whether originating directly or via compaction + log.error("storeCompactionState flag set for a non-REPLACE query [%s]", queryDef.getQueryId()); + } else { + + DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel + .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); + + ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + + compactionStateAnnotateFunction = prepareCompactionStateAnnotateFunction( + task(), + context.jsonMapper(), + dataSchema, + shardSpec, + queryDef.getQueryId() + ); + } + } + log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(compactionStateAnnotateFunction.apply(segments)); + } + } + + public static Function<Set<DataSegment>, Set<DataSegment>> prepareCompactionStateAnnotateFunction( + MSQControllerTask task, + ObjectMapper jsonMapper, + DataSchema dataSchema, + ShardSpec shardSpec, + String queryId + ) + { + PartitionsSpec partitionSpec; + + if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE) + || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) { + List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + partitionSpec = new DimensionRangePartitionsSpec( + task.getQuerySpec().getTuningConfig().getRowsPerSegment(), + null, + partitionDimensions, + false + ); + + } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { + partitionSpec = new DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(), null); + } else { + log.error( Review Comment: I feel this should be a MSQ error ie throw a MSQ Fault and fail the job since if we do add new shardSpecs to MSQ, we should also add support to store compaction stage. If we donot add code here, the jobs of the user would `pass` with this error message in the logs. It would require lot of debugging to figure out that we missed adding stuff here. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded( //noinspection unchecked @SuppressWarnings("unchecked") final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); + + Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); + + boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext()) + .getBoolean( + Tasks.STORE_COMPACTION_STATE_KEY, + Tasks.DEFAULT_STORE_COMPACTION_STATE + ); + + if (!segments.isEmpty() && storeCompactionState) { + + DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); + if (!destination.isReplaceTimeChunks()) { + // Only do this for replace queries, whether originating directly or via compaction + log.error("storeCompactionState flag set for a non-REPLACE query [%s]", queryDef.getQueryId()); Review Comment: ```suggestion log.warn("storeCompactionState flag set for a non-REPLACE query [%s]. Ignoring the flag for now.", queryDef.getQueryId()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org