cryptoe commented on code in PR #15965:
URL: https://github.com/apache/druid/pull/15965#discussion_r1546034348


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
       //noinspection unchecked
       @SuppressWarnings("unchecked")
       final Set<DataSegment> segments = (Set<DataSegment>) 
queryKernel.getResultObjectForStage(finalStageId);
+
+      Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction = Function.identity();
+
+      boolean storeCompactionState = 
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+                                                 .getBoolean(
+                                                     
Tasks.STORE_COMPACTION_STATE_KEY,
+                                                     
Tasks.DEFAULT_STORE_COMPACTION_STATE
+                                                 );
+
+      if (!segments.isEmpty() && storeCompactionState) {
+
+        DataSourceMSQDestination destination = (DataSourceMSQDestination) 
task.getQuerySpec().getDestination();
+        if (!destination.isReplaceTimeChunks()) {
+          // Only do this for replace queries, whether originating directly or 
via compaction
+          log.error("storeCompactionState flag set for a non-REPLACE query 
[%s]", queryDef.getQueryId());
+        } else {
+
+          DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) 
queryKernel
+              
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+          ShardSpec shardSpec = 
segments.stream().findFirst().get().getShardSpec();
+
+          compactionStateAnnotateFunction = 
prepareCompactionStateAnnotateFunction(
+              task(),
+              context.jsonMapper(),
+              dataSchema,
+              shardSpec,
+              queryDef.getQueryId()
+          );
+        }
+      }
+
       log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), 
segments.size());
-      publishAllSegments(segments);
+      publishAllSegments(compactionStateAnnotateFunction.apply(segments));
+    }
+  }
+
+  public static Function<Set<DataSegment>, Set<DataSegment>> 
prepareCompactionStateAnnotateFunction(
+      MSQControllerTask task,
+      ObjectMapper jsonMapper,
+      DataSchema dataSchema,
+      ShardSpec shardSpec,
+      String queryId
+  )
+  {
+    PartitionsSpec partitionSpec;
+
+    if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
+         || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
+      List<String> partitionDimensions = ((DimensionRangeShardSpec) 
shardSpec).getDimensions();
+      partitionSpec = new DimensionRangePartitionsSpec(
+          task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
+          null,
+          partitionDimensions,
+          false
+      );
+
+    } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
+      partitionSpec = new 
DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(),
 null);
+    } else {
+      log.error(

Review Comment:
   I feel this should be a MSQ error ie throw a MSQ Fault and fail the job 
since if we do add new shardSpecs to MSQ, we should also add support to store 
compaction stage. If we donot add code here, the jobs of the user would `pass` 
with this error message in the logs. It would require lot of debugging to 
figure out that we missed adding stuff here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
       //noinspection unchecked
       @SuppressWarnings("unchecked")
       final Set<DataSegment> segments = (Set<DataSegment>) 
queryKernel.getResultObjectForStage(finalStageId);
+
+      Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction = Function.identity();
+
+      boolean storeCompactionState = 
QueryContext.of(task.getQuerySpec().getQuery().getContext())
+                                                 .getBoolean(
+                                                     
Tasks.STORE_COMPACTION_STATE_KEY,
+                                                     
Tasks.DEFAULT_STORE_COMPACTION_STATE
+                                                 );
+
+      if (!segments.isEmpty() && storeCompactionState) {
+
+        DataSourceMSQDestination destination = (DataSourceMSQDestination) 
task.getQuerySpec().getDestination();
+        if (!destination.isReplaceTimeChunks()) {
+          // Only do this for replace queries, whether originating directly or 
via compaction
+          log.error("storeCompactionState flag set for a non-REPLACE query 
[%s]", queryDef.getQueryId());

Review Comment:
   ```suggestion
             log.warn("storeCompactionState flag set for a non-REPLACE query 
[%s]. Ignoring the flag for now.", queryDef.getQueryId());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to