jihoonson commented on a change in pull request #11379:
URL: https://github.com/apache/druid/pull/11379#discussion_r665804864
##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
##########
@@ -1016,6 +1017,42 @@ public GroupByQuery toGroupByQuery()
grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
+ // in this case, the timestampResult optimization is not neccessary
+ if (query.getLimitSpec() instanceof DefaultLimitSpec &&
query.isApplyLimitPushDown()) {
+ return query;
+ }
+ Map<String, Object> theContext = plannerContext.getQueryContext();
+
+ Granularity queryGranularity = null;
+
+ if (!grouping.getDimensions().isEmpty()) {
+ for (DimensionExpression dimensionExpression : grouping.getDimensions())
{
+ Granularity granularity = Expressions.toQueryGranularity(
+ dimensionExpression.getDruidExpression(),
+ plannerContext.getExprMacroTable()
+ );
+ if (granularity == null) {
+ continue;
+ }
+ if (queryGranularity != null) {
+ // group by more than one timestamp_floor
+ // eg: group by timestamp_floor(__time to
DAY),timestamp_floor(__time, to HOUR)
+ queryGranularity = null;
+ break;
+ }
+ queryGranularity = granularity;
+ int timestampDimensionIndexInDimensions =
grouping.getDimensions().indexOf(dimensionExpression);
+ theContext = new HashMap<>();
Review comment:
Thanks, but this is quite confusing. We should not overwrite variables
unless we have to.
##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
##########
@@ -1016,6 +1017,63 @@ public GroupByQuery toGroupByQuery()
grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
+ // in this case, the timestampResult optimization is not neccessary
+ if (query.getLimitSpec() instanceof DefaultLimitSpec &&
query.isApplyLimitPushDown()) {
+ return query;
+ }
+ Map<String, Object> theContext = plannerContext.getQueryContext();
Review comment:
```suggestion
final Map<String, Object> theContext = new HashMap<>();
```
##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
##########
@@ -1016,6 +1017,42 @@ public GroupByQuery toGroupByQuery()
grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
+ // in this case, the timestampResult optimization is not neccessary
Review comment:
Hmm sorry, I don't quite understand. I'm not sure what the limit clause
does anything with this optimization.
> When `query.isApplyLimitPushDown()` is true, it means the limit is
calculated on compute node first.
Yes, when `isApplyLimitPushDown` is true, the limit is applied in both the
data nodes and the broker. In data nodes, the limit is applied when per-segment
results are merged. The broker applies the limit again when it merges
per-data-node results. For example, let's say you issued a groupBy query with a
limit of 10, each data node would return at most 10 rows and the broker would
merge those per-data-node results and apply the limit again to finally return
only 10 rows.
> `select floor(__time to hour),dim1 from a_table group by floor(__time to
hour),dim1 limit 10`
> On compute node, the row number produced with limit when granularity=ALL
is different with the sum of row number produced with limit in each hour.
> The `limit` should be applied to the whole result rows.
I don't think the number of rows should be different in those cases. This
optimization not just modifies the granularity but also dimensions. When
granularity is set to ALL, the dimensions should have the virtual column for
the timefloor function. When granularity is set to HOUR, the dimensions should
not have that virtual column. In these cases, the actual keys used for grouping
are effectively the same because the ALL granularity groups all rows into one
bucket. As a result, the number of rows must be the same in both cases.
> `select dim1,floor(__time to hour),dim2 from a_table group by
dim1,floor(__time to hour),dim2 limit 10`
> For this even more complicated case, if we replace granularity=ALL with
granularity=HOUR, we have to modify the Buffer Grouper logic to adjust the
time_floor dimension value(and the position in row) in the result row to make
it looks the same with the result row when granularity=ALL, and not only the
value but also the sequence in rows after the `limit` is applied. I don't think
it is worth for both logic complexity or performance gain.
What logic should we modify? The optimization you added is modifying the
native query that is passed all the way down to the data nodes which use the
`BufferGrouper`. Why does pushing down the limit make any difference in the
result signature of the data nodes?
> And for another reason, when the `limit` is pushed down to compute node, I
think the whole computation should be fast enough, the timestampResult
optimization is not going to be that effectively improve the total performance.
There is one performance issue with `LimitedBufferHashGrouper` that I'm
aware of that makes the groupBy with limitPushDown on even slower than the same
query with limitPushDown off. But even if there was no known performance issue,
I think query performance is really different case by case and thus I wouldn't
say that the query processing with limitPushDown is fast in data nodes and thus
we need no more optimization.
> And the last reason, if we don't pushdown the `limit` to compute node and
apply this optimization and do limit on broker, I guess the total performance
is degraded.
Hmm, I'm confused. Are you saying that we should push down the limit because
the performance will be worse otherwise? If so, I agree.
> Generally, the timestampResult optimization logic happens on broker, so I
disable this optimization when `query.isApplyLimitPushDown()` is true
What do you mean by "the timestampResult optimization logic happens on
broker"? The rewritten query will be passed down to the data nodes as far as I
can tell.
Just for clarification, my question is not why we don't optimize this case
now, but is whether your comment here is correct.
##########
File path:
processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
##########
@@ -213,7 +216,48 @@ public boolean doMergeResults(final GroupByQuery query)
context.put("finalize", false);
context.put(GroupByQueryConfig.CTX_KEY_STRATEGY,
GroupByStrategySelector.STRATEGY_V2);
context.put(CTX_KEY_OUTERMOST, false);
- if (query.getUniversalTimestamp() != null) {
+ Map<String, Object> timestampFieldContext =
GroupByQueryHelper.findTimestampResultField(query);
+ context.putAll(timestampFieldContext);
+
+ Granularity granularity = query.getGranularity();
+ List<DimensionSpec> dimensionSpecs = query.getDimensions();
+ final String timestampResultField = (String)
timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD);
+ final boolean hasTimestampResultField = timestampResultField != null
+ &&
query.getContextBoolean(CTX_KEY_OUTERMOST, true);
+ int timestampResultFieldIndex = 0;
+ if (hasTimestampResultField) {
+ // sql like "group by city_id,time_floor(__time to day)",
+ // the original translated query is granularity=all and dimensions:[d0,
d1]
+ // the better plan is granularity=day and dimensions:[d0]
+ // but the ResultRow structure is changed from [d0, d1] to [__time, d0]
+ // this structure should be fixed as [d0, d1] (actually it is [d0,
__time]) before postAggs are called
+ final Granularity timestampResultFieldGranularity
+ = (Granularity)
timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
+ dimensionSpecs =
+ query.getDimensions()
+ .stream()
+ .filter(dimensionSpec ->
!dimensionSpec.getOutputName().equals(timestampResultField))
+ .collect(Collectors.toList());
+ granularity = timestampResultFieldGranularity;
+ // when timestampResultField is the last dimension, should set
sortByDimsFirst=true,
+ // otherwise the downstream is sorted by row's timestamp first which
makes the final ordering not as expected
+ timestampResultFieldIndex = (int)
timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX);
+ if (!query.getContextSortByDimsFirst() && timestampResultFieldIndex ==
query.getDimensions().size() - 1) {
+ context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, true);
+ }
+ // when timestampResultField is the first dimension and
sortByDimsFirst=true,
+ // it is actually equals to sortByDimsFirst=false
+ if (query.getContextSortByDimsFirst() && timestampResultFieldIndex == 0)
{
+ context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);
+ }
+ // when hasTimestampResultField=true and timestampResultField is neither
first nor last dimension,
+ // the DefaultLimitSpec will always do the reordering
+ }
Review comment:
Thank you for adding comments!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]