adarshsanjeev commented on code in PR #16699:
URL: https://github.com/apache/druid/pull/16699#discussion_r1714136112


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1782,53 +1742,14 @@ private static QueryDefinition makeQueryDefinition(
     }
 
     if (MSQControllerTask.isIngestion(querySpec)) {
-      final RowSignature querySignature = 
queryDef.getFinalStageDefinition().getSignature();
-      final ClusterBy queryClusterBy = 
queryDef.getFinalStageDefinition().getClusterBy();
-
-      // Find the stage that provides shuffled input to the final 
segment-generation stage.
-      StageDefinition finalShuffleStageDef = 
queryDef.getFinalStageDefinition();
-
-      while (!finalShuffleStageDef.doesShuffle()
-             && 
InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()).size() == 1) {
-        finalShuffleStageDef = queryDef.getStageDefinition(
-            
Iterables.getOnlyElement(InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()))
-        );
-      }
-
-      if (!finalShuffleStageDef.doesShuffle()) {
-        finalShuffleStageDef = null;
-      }
-
-      // Add all query stages.
-      // Set shuffleCheckHasMultipleValues on the stage that serves as input 
to the final segment-generation stage.
-      final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
-
-      for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
-        if (stageDef.equals(finalShuffleStageDef)) {
-          
builder.add(StageDefinition.builder(stageDef).shuffleCheckHasMultipleValues(true));
-        } else {
-          builder.add(StageDefinition.builder(stageDef));
-        }
+      DataSourceMSQDestination destination = (DataSourceMSQDestination) 
querySpec.getDestination();
+      TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec();
+      if (terminalStageSpec instanceof SegmentGenerationStageSpec) {
+        return ((SegmentGenerationStageSpec) 
terminalStageSpec).constructFinalStage(queryId, queryDef, querySpec, 
jsonMapper);

Review Comment:
   We would also need the querySpec for the tuningConfig. Passing the 
jsonMapper here would also require SegmentGenerationStageSpec to not be a 
singleton class, and instead require a jsonMapper to be injected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to