The way nested query execution is implemented today, it executes the inner most 
query on the historical nodes with the outer queries being executed on the 
broker node. This can get problematic when the inner query groups using a high 
cardinality dimension, returning too many records for broker node to handle. 
One of the options that we have been internally testing and exploring is the 
capability to push down the complete nested query to the historical nodes. Each 
historical node then will execute the nested query primarily dealing with the 
segments it owns for that query. Because the number of records returned by each 
historical node would potentially be much smaller in this case, it would be 
less intensive for the broker to perform the final merge and aggregation. The 
broker though won't need to perform any more dimension or segment level 
filtering since it will be taken care of at the historical nodes itself. Note 
that this way of distributing the aggregation to the historical node
 s doesn't always return the same results as the final aggregation getting done 
on the broker node. However, there is a good set of cases (for ex - aggregating 
on dimensions that are used for hashing during ingestion) where this kind of 
push down logic will return the right results. I can get into this into more 
detail but the general idea was to leave the onus on the user to figure out if 
their data layout allows for this kind of push down.

This implementation provides user a way of forcing nested query execution 
through a query context variable. The next cut will focus on doing this 
automatically under the following conditions (credit to @gianm ) for clearly 
articulating the following:

The groupBy query granularity is equal to, or finer than, segment granularity;
and either:

2a) A time chunk uses HashBasedNumberedShardSpec, partitionDimensions is 
nonempty, the grouping dimension set contains all of the shard 
partitionDimensions, and there are no "extension" partitions (partitions with 
partitionNum >= partitions, which are created by ingest tasks that append data)

or:

2b) A time chunk uses SingleDimensionShardSpec and the grouping dimension set 
contains the shard dimension.

If Druid detects this it should push down the query automatically for that time 
chunk. There will be situations where the query can be pushed down for some 
time chunks but not others (for example: consider a data pipeline that loads 
data unpartitioned in realtime, and later has a reindexing job to partition 
historical data by some useful dimension). In this case, ideally the broker 
should be capable of pushing down the query for the time chunks where it can be 
correctly pushed down, and not pushing it down for others.

TODO: support "subtotalspec" for push down nested queries.

[ Full content available at: 
https://github.com/apache/incubator-druid/pull/6410 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to