cryptoe commented on code in PR #16699:
URL: https://github.com/apache/druid/pull/16699#discussion_r1685996610
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1782,53 +1742,14 @@ private static QueryDefinition makeQueryDefinition(
}
if (MSQControllerTask.isIngestion(querySpec)) {
- final RowSignature querySignature =
queryDef.getFinalStageDefinition().getSignature();
- final ClusterBy queryClusterBy =
queryDef.getFinalStageDefinition().getClusterBy();
-
- // Find the stage that provides shuffled input to the final
segment-generation stage.
- StageDefinition finalShuffleStageDef =
queryDef.getFinalStageDefinition();
-
- while (!finalShuffleStageDef.doesShuffle()
- &&
InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()).size() == 1) {
- finalShuffleStageDef = queryDef.getStageDefinition(
-
Iterables.getOnlyElement(InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()))
- );
- }
-
- if (!finalShuffleStageDef.doesShuffle()) {
- finalShuffleStageDef = null;
- }
-
- // Add all query stages.
- // Set shuffleCheckHasMultipleValues on the stage that serves as input
to the final segment-generation stage.
- final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
-
- for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
- if (stageDef.equals(finalShuffleStageDef)) {
-
builder.add(StageDefinition.builder(stageDef).shuffleCheckHasMultipleValues(true));
- } else {
- builder.add(StageDefinition.builder(stageDef));
- }
+ DataSourceMSQDestination destination = (DataSourceMSQDestination)
querySpec.getDestination();
+ TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec();
+ if (terminalStageSpec instanceof SegmentGenerationStageSpec) {
+ return ((SegmentGenerationStageSpec)
terminalStageSpec).constructFinalStage(queryId, queryDef, querySpec,
jsonMapper);
Review Comment:
We can inject the jsonMapper.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1782,53 +1742,14 @@ private static QueryDefinition makeQueryDefinition(
}
if (MSQControllerTask.isIngestion(querySpec)) {
- final RowSignature querySignature =
queryDef.getFinalStageDefinition().getSignature();
- final ClusterBy queryClusterBy =
queryDef.getFinalStageDefinition().getClusterBy();
-
- // Find the stage that provides shuffled input to the final
segment-generation stage.
- StageDefinition finalShuffleStageDef =
queryDef.getFinalStageDefinition();
-
- while (!finalShuffleStageDef.doesShuffle()
- &&
InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()).size() == 1) {
- finalShuffleStageDef = queryDef.getStageDefinition(
-
Iterables.getOnlyElement(InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()))
- );
- }
-
- if (!finalShuffleStageDef.doesShuffle()) {
- finalShuffleStageDef = null;
- }
-
- // Add all query stages.
- // Set shuffleCheckHasMultipleValues on the stage that serves as input
to the final segment-generation stage.
- final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
-
- for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
- if (stageDef.equals(finalShuffleStageDef)) {
-
builder.add(StageDefinition.builder(stageDef).shuffleCheckHasMultipleValues(true));
- } else {
- builder.add(StageDefinition.builder(stageDef));
- }
+ DataSourceMSQDestination destination = (DataSourceMSQDestination)
querySpec.getDestination();
+ TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec();
+ if (terminalStageSpec instanceof SegmentGenerationStageSpec) {
+ return ((SegmentGenerationStageSpec)
terminalStageSpec).constructFinalStage(queryId, queryDef, querySpec,
jsonMapper);
Review Comment:
can this be a interface method and we pass query def only ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1192,11 +1176,16 @@ private Int2ObjectMap<Object>
makeWorkerFactoryInfosForStage(
{
if (MSQControllerTask.isIngestion(querySpec) &&
stageNumber == queryDef.getFinalStageDefinition().getStageNumber()) {
- // noinspection unchecked,rawtypes
- return (Int2ObjectMap)
makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate);
- } else {
- return null;
+ final DataSourceMSQDestination destination = (DataSourceMSQDestination)
querySpec.getDestination();
+ TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec();
+ if (destination.getTerminalStageSpec() instanceof
SegmentGenerationStageSpec) {
+ return (Int2ObjectMap) ((SegmentGenerationStageSpec)
terminalStageSpec).makeSegmentGeneratorWorkerFactoryInfos(
Review Comment:
```suggestion
return (Int2ObjectMap) ((SegmentGenerationStageSpec)
terminalStageSpec).getWorkerInfo(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]