LakshSingla commented on code in PR #16854:
URL: https://github.com/apache/druid/pull/16854#discussion_r1715261379


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########


Review Comment:
   The following review is based on my understanding of the window query kit, 
and isn't applicable to the changes in this review, but have been made 
recently, so we should try to fix them up in this patch unless its a functional 
change that requires tests and vetting:
   
   1. The `log.info` in the window query kit is out of place. We can remove 
them, as most of the data that those provide can be derived from the task 
report. While it's not logging anything excessively, the log isn't providing 
any extra value. 
   2. `getOperatorListFromQuery` should be documented properly - what the 
inputs are, what processing is done on them, and what its outputs are. As it 
stands currently, it's a convoluted method that takes in a list of op factories 
and partitions them, based on unknown criteria. A few examples of the input 
list and output partitions would clear up that for me
   3. I think that the code for `getOperatorListFromQuery` can be improved, and 
can be made self-documenting. There are one-off checks done which don't seem to 
have a well-defined rationale to me. I am vaguely familiar with window 
functions, therefore reading the query kit should make it clear as to how the 
data will flow through different MSQ stages. 
   4. Above points are equally applicable to the method 
`findShuffleSpecForNextWindow`. I am unable to treat it like a black box 
because it isn't documented, and I am unable to figure out what it's trying to 
achieve (with my limited knowledge of window functions). There are a few 
questions that pop up in my mind while reading the code. If possible, there 
must be assertions/checks against paths that are not expected, so that its 
clear what the input expects: 
   a. Can it happen that there are multiple sort/partition operators to the 
input. What happens in that case, and which one takes precedence over the other 
and why? And if it doesn't happen, what prevents that from happening
   b. If partition = null, sort != null, we still return null without 
accounting for sort. Is it correct? I feel we won't encounter that case. 
   c. What if a partition isn't present in the sort columns. Can it happen? If 
it does, we should throw an error instead. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -105,8 +108,20 @@ public QueryDefinition makeQueryDefinition(
 
     final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
     final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
-    final int maxRowsMaterialized;
 
+    // Get segment granularity from query context, and create ShuffleSpec and 
RowSignature to be used for the final window stage.
+    final Granularity segmentGranularity = 
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, 
queryToRun.getContext());
+    final ClusterBy finalWindowClusterBy = 
QueryKitUtils.clusterByWithSegmentGranularity(ClusterBy.none(), 
segmentGranularity);
+    ShuffleSpec finalWindowStageShuffleSpec = 
resultShuffleSpecFactory.build(finalWindowClusterBy, false);
+    if (Objects.equals(segmentGranularity, Granularities.ALL)) {
+      finalWindowStageShuffleSpec = MixShuffleSpec.instance();
+    }

Review Comment:
   This seems wrong. We should still be using the `resultShuffleSpecFactory` to 
create the final shuffle spec. If the windowing stage is an input to the 
segment generator, then the window processor would create one large partition, 
while we expect the segments to be sized correctly. Am I missing the reason for 
this additional check? 
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -105,8 +108,20 @@ public QueryDefinition makeQueryDefinition(
 
     final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
     final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
-    final int maxRowsMaterialized;
 
+    // Get segment granularity from query context, and create ShuffleSpec and 
RowSignature to be used for the final window stage.
+    final Granularity segmentGranularity = 
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, 
queryToRun.getContext());
+    final ClusterBy finalWindowClusterBy = 
QueryKitUtils.clusterByWithSegmentGranularity(ClusterBy.none(), 
segmentGranularity);

Review Comment:
   Why is this `ClusterBy.none()`? I am not certain if that's wrong, but we 
should mention a reason as to why the final stage doesn't sort by anything 
apart from time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to