jihoonson commented on a change in pull request #11379:
URL: https://github.com/apache/druid/pull/11379#discussion_r665804864



##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
##########
@@ -1016,6 +1017,42 @@ public GroupByQuery toGroupByQuery()
         grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
         ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
     );
+    // in this case, the timestampResult optimization is not neccessary
+    if (query.getLimitSpec() instanceof DefaultLimitSpec && 
query.isApplyLimitPushDown()) {
+      return query;
+    }
+    Map<String, Object> theContext = plannerContext.getQueryContext();
+
+    Granularity queryGranularity = null;
+
+    if (!grouping.getDimensions().isEmpty()) {
+      for (DimensionExpression dimensionExpression : grouping.getDimensions()) 
{
+        Granularity granularity = Expressions.toQueryGranularity(
+            dimensionExpression.getDruidExpression(),
+            plannerContext.getExprMacroTable()
+        );
+        if (granularity == null) {
+          continue;
+        }
+        if (queryGranularity != null) {
+          // group by more than one timestamp_floor
+          // eg: group by timestamp_floor(__time to 
DAY),timestamp_floor(__time, to HOUR)
+          queryGranularity = null;
+          break;
+        }
+        queryGranularity = granularity;
+        int timestampDimensionIndexInDimensions = 
grouping.getDimensions().indexOf(dimensionExpression);
+        theContext = new HashMap<>();

Review comment:
       Thanks, but this is quite confusing. We should not overwrite variables 
unless we have to.

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
##########
@@ -1016,6 +1017,63 @@ public GroupByQuery toGroupByQuery()
         grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
         ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
     );
+    // in this case, the timestampResult optimization is not neccessary
+    if (query.getLimitSpec() instanceof DefaultLimitSpec && 
query.isApplyLimitPushDown()) {
+      return query;
+    }
+    Map<String, Object> theContext = plannerContext.getQueryContext();

Review comment:
       ```suggestion
       final Map<String, Object> theContext = new HashMap<>();
   ```

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
##########
@@ -1016,6 +1017,42 @@ public GroupByQuery toGroupByQuery()
         grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
         ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
     );
+    // in this case, the timestampResult optimization is not neccessary

Review comment:
       Hmm sorry, I don't quite understand. I'm not sure what the limit clause 
does anything with this optimization.
   
   > When `query.isApplyLimitPushDown()` is true, it means the limit is 
calculated on compute node first.
   
   Yes, when `isApplyLimitPushDown` is true, the limit is applied in both the 
data nodes and the broker. In data nodes, the limit is applied when per-segment 
results are merged. The broker applies the limit again when it merges 
per-data-node results. For example, let's say you issued a groupBy query with a 
limit of 10, each data node would return at most 10 rows and the broker would 
merge those per-data-node results and apply the limit again to finally return 
only 10 rows.
   
   > `select floor(__time to hour),dim1 from a_table group by floor(__time to 
hour),dim1 limit 10`
   > On compute node, the row number produced with limit when granularity=ALL 
is different with the sum of row number produced with limit in each hour.
   > The `limit` should be applied to the whole result rows.
   
   I don't think the number of rows should be different in those cases. This 
optimization not just modifies the granularity but also dimensions. When 
granularity is set to ALL, the dimensions should have the virtual column for 
the timefloor function. When granularity is set to HOUR, the dimensions should 
not have that virtual column. In these cases, the actual keys used for grouping 
are effectively the same because the ALL granularity groups all rows into one 
bucket. As a result, the number of rows must be the same in both cases.
   
   > `select dim1,floor(__time to hour),dim2 from a_table group by 
dim1,floor(__time to hour),dim2 limit 10`
   > For this even more complicated case, if we replace granularity=ALL with 
granularity=HOUR, we have to modify the Buffer Grouper logic to adjust the 
time_floor dimension value(and the position in row) in the result row to make 
it looks the same with the result row when granularity=ALL, and not only the 
value but also the sequence in rows after the `limit` is applied. I don't think 
it is worth for both logic complexity or performance gain.
   
   What logic should we modify? The optimization you added is modifying the 
native query that is passed all the way down to the data nodes which use the 
`BufferGrouper`. Why does pushing down the limit make any difference in the 
result signature of the data nodes?
   
   > And for another reason, when the `limit` is pushed down to compute node, I 
think the whole computation should be fast enough, the timestampResult 
optimization is not going to be that effectively improve the total performance.
   
   There is one performance issue with `LimitedBufferHashGrouper` that I'm 
aware of that makes the groupBy with limitPushDown on even slower than the same 
query with limitPushDown off. But even if there was no known performance issue, 
I think query performance is really different case by case and thus I wouldn't 
say that the query processing with limitPushDown is fast in data nodes and thus 
we need no more optimization.
   
   > And the last reason, if we don't pushdown the `limit` to compute node and 
apply this optimization and do limit on broker, I guess the total performance 
is degraded.
   
   Hmm, I'm confused. Are you saying that we should push down the limit because 
the performance will be worse otherwise? If so, I agree.
   
   > Generally, the timestampResult optimization logic happens on broker, so I 
disable this optimization when `query.isApplyLimitPushDown()` is true
   
   What do you mean by "the timestampResult optimization logic happens on 
broker"? The rewritten query will be passed down to the data nodes as far as I 
can tell.
   
   Just for clarification, my question is not why we don't optimize this case 
now, but is whether your comment here is correct.

##########
File path: 
processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
##########
@@ -213,7 +216,48 @@ public boolean doMergeResults(final GroupByQuery query)
     context.put("finalize", false);
     context.put(GroupByQueryConfig.CTX_KEY_STRATEGY, 
GroupByStrategySelector.STRATEGY_V2);
     context.put(CTX_KEY_OUTERMOST, false);
-    if (query.getUniversalTimestamp() != null) {
+    Map<String, Object> timestampFieldContext = 
GroupByQueryHelper.findTimestampResultField(query);
+    context.putAll(timestampFieldContext);
+
+    Granularity granularity = query.getGranularity();
+    List<DimensionSpec> dimensionSpecs = query.getDimensions();
+    final String timestampResultField = (String) 
timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD);
+    final boolean hasTimestampResultField = timestampResultField != null
+                                            && 
query.getContextBoolean(CTX_KEY_OUTERMOST, true);
+    int timestampResultFieldIndex = 0;
+    if (hasTimestampResultField) {
+      // sql like "group by city_id,time_floor(__time to day)",
+      // the original translated query is granularity=all and dimensions:[d0, 
d1]
+      // the better plan is granularity=day and dimensions:[d0]
+      // but the ResultRow structure is changed from [d0, d1] to [__time, d0]
+      // this structure should be fixed as [d0, d1] (actually it is [d0, 
__time]) before postAggs are called
+      final Granularity timestampResultFieldGranularity
+          = (Granularity) 
timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
+      dimensionSpecs =
+          query.getDimensions()
+               .stream()
+               .filter(dimensionSpec -> 
!dimensionSpec.getOutputName().equals(timestampResultField))
+               .collect(Collectors.toList());
+      granularity = timestampResultFieldGranularity;
+      // when timestampResultField is the last dimension, should set 
sortByDimsFirst=true,
+      // otherwise the downstream is sorted by row's timestamp first which 
makes the final ordering not as expected
+      timestampResultFieldIndex = (int) 
timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX);
+      if (!query.getContextSortByDimsFirst() && timestampResultFieldIndex == 
query.getDimensions().size() - 1) {
+        context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, true);
+      }
+      // when timestampResultField is the first dimension and 
sortByDimsFirst=true,
+      // it is actually equals to sortByDimsFirst=false
+      if (query.getContextSortByDimsFirst() && timestampResultFieldIndex == 0) 
{
+        context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);
+      }
+      // when hasTimestampResultField=true and timestampResultField is neither 
first nor last dimension,
+      // the DefaultLimitSpec will always do the reordering
+    }

Review comment:
       Thank you for adding comments!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to