Akshat-Jain commented on code in PR #16729:
URL: https://github.com/apache/druid/pull/16729#discussion_r1678825723
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -125,28 +139,53 @@ public QueryDefinition makeQueryDefinition(
queryToRun.getOperators(),
rowSignature,
true,
- maxRowsMaterialized
+ maxRowsMaterialized,
+ new ArrayList<>()
))
);
} else {
- // there are multiple windows present in the query
- // Create stages for each window in the query
- // These stages will be serialized
- // the partition by clause of the next window will be the shuffle key
for the previous window
+ // There are multiple windows present in the query.
+ // Create stages for each window in the query.
+ // These stages will be serialized.
+ // The partition by clause of the next window will be the shuffle key
for the previous window.
RowSignature.Builder bob = RowSignature.builder();
- final int numberOfWindows = operatorList.size();
- final int baseSize = rowSignature.size() - numberOfWindows;
- for (int i = 0; i < baseSize; i++) {
- bob.add(rowSignature.getColumnName(i),
rowSignature.getColumnType(i).get());
+ RowSignature signatureFromInput =
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+ log.info("Row signature received from last stage is [%s].",
signatureFromInput);
+
+ for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
+ bob.add(signatureFromInput.getColumnName(i),
signatureFromInput.getColumnType(i).get());
}
- for (int i = 0; i < numberOfWindows; i++) {
- bob.add(rowSignature.getColumnName(baseSize + i),
rowSignature.getColumnType(baseSize + i).get()).build();
+ List<String> partitionColumnNames = new ArrayList<>();
+
+ /*
+ operatorList is a List<List<OperatorFactory>>, where each
List<OperatorFactory> corresponds to the operator factories
+ to be used for a different window stage.
+
+ We iterate over operatorList, and add the definition for a window stage
to QueryDefinitionBuilder.
+ */
+ for (int i = 0; i < operatorList.size(); i++) {
+ for (OperatorFactory operatorFactory : operatorList.get(i)) {
+ if (operatorFactory instanceof WindowOperatorFactory) {
+ List<String> outputColumnNames = ((WindowOperatorFactory)
operatorFactory).getProcessor().getOutputColumnNames();
+
+ // Need to add column names which are present in outputColumnNames
and rowSignature but not in bob,
+ // since they need to be present in the row signature for this
window stage.
+ for (String columnName : outputColumnNames) {
+ int indexInRowSignature = rowSignature.indexOf(columnName);
+ if (indexInRowSignature != -1 && bob.build().indexOf(columnName)
== -1) {
Review Comment:
@sreemanamala
> So, we are trying to be cautious here.
Yeah. We could get rid of it maybe once the entire flow has had enough of
soak time. But I like the idea of throwing error instead of letting the query
run through an unexpected scenario.
> Instead of ISE, we could use a DruidException and the error message would
give column value like w0. (might not be very helpful)
I saw ISE being thrown in other query kits, so used that for consistency.
Column value like w0 should be helpful since we are also logging the entire row
signature, as well as which columns are getting added to the row signature for
every stage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]