LakshSingla commented on code in PR #14994:
URL: https://github.com/apache/druid/pull/14994#discussion_r1331928033
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1556,8 +1556,13 @@ private static QueryDefinition makeQueryDefinition(
shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
queryToPlan = querySpec.getQuery();
} else if (querySpec.getDestination() instanceof
DurableStorageMSQDestination) {
- // we add a final stage which generates one partition per worker.
- shuffleSpecFactory =
ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers());
+
+ shuffleSpecFactory = (clusterBy, aggregate) ->
+ new GlobalSortTargetSizeShuffleSpec(
+ clusterBy,
+
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()),
+ aggregate
+ );
Review Comment:
Let's refactor this lambda to ShuffleSpecFactories#globalSortWithTargetSize
and change the other instance here
https://github.com/apache/druid/blob/79f882f48c15eef77d627f1d31212d6c09cae315/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java#L1585
to use that factory as well.
##########
docs/multi-stage-query/reference.md:
##########
@@ -246,6 +246,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use
durable storage for shuffle mesh. To use this feature, configure the durable
storage at the server level using
`druid.msq.intermediate.storage.enable=true`). If these properties are not
configured, any query with the context variable `durableShuffleStorage=true`
fails with a configuration error. <br /><br />
| `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on
fault tolerance mode or not. Failed workers are retried based on
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly
set to false.
| `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of
the select query is written. <br />Use `taskReport`(the default) to write
select results to the task report. <b> This is not scalable since task reports
size explodes for large results </b> <br/>Use `durableStorage` to write results
to durable storage location. <b>For large results sets, its recommended to use
`durableStorage` </b>. To configure durable storage see
[`this`](#durable-storage) section.
| `taskRep
ort` |
+| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The
actual number of rows per page may be somewhat higher or lower than this
number. In most cases, use the default.<br /> This property comes into effect
only when `selectDestination` is set to `durableStorage`
| 100000 |
Review Comment:
Does `page` mean something for the end user reading the documentation? If
not, I'd suggest
* Use some commonly used cloud terminology in the explanation of the
parameter - Maximum number of rows that can be present in a single object
stored in the durable storage
* We should mention when this should be set by the user - and in our case,
one good use case would be if the file/object size exceeds the maximum
allowable limit.
* Perhaps change the context parameter to something else, though I am fine
with it, since this shouldn't be used by the user very frequently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]