sreemanamala commented on code in PR #16729:
URL: https://github.com/apache/druid/pull/16729#discussion_r1675753366
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -65,13 +69,23 @@ public QueryDefinition makeQueryDefinition(
int minStageNumber
)
{
- // need to validate query first
- // populate the group of operators to be processed as each stage
- // the size of the operators is the number of serialized stages
- // later we should also check if these can be parallelized
- // check there is an empty over clause or not
+ // Need to validate query first.
+ // Populate the group of operators to be processed at each stage.
+ // The size of the operators is the number of serialized stages.
+ // Later we should also check if these can be parallelized.
+ // Check if there is an empty OVER() clause or not.
List<List<OperatorFactory>> operatorList = new ArrayList<>();
- boolean isEmptyOverFound =
ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList);
+ RowSignature rowSignature = originalQuery.getRowSignature();
+ log.info("Row signature received for query is [%s].", rowSignature);
+
+ boolean isEmptyOverPresent = originalQuery.getOperators()
Review Comment:
A window without a partition key but having an order by key `window (ORDER
BY expr)` would still give `isEmptyOverPresent` true, in which case the logic
of moving everything to a single partition holds correct. So probably can just
change the variable name and the related comments.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -125,28 +139,53 @@ public QueryDefinition makeQueryDefinition(
queryToRun.getOperators(),
rowSignature,
true,
- maxRowsMaterialized
+ maxRowsMaterialized,
+ new ArrayList<>()
))
);
} else {
- // there are multiple windows present in the query
- // Create stages for each window in the query
- // These stages will be serialized
- // the partition by clause of the next window will be the shuffle key
for the previous window
+ // There are multiple windows present in the query.
+ // Create stages for each window in the query.
+ // These stages will be serialized.
+ // The partition by clause of the next window will be the shuffle key
for the previous window.
RowSignature.Builder bob = RowSignature.builder();
- final int numberOfWindows = operatorList.size();
- final int baseSize = rowSignature.size() - numberOfWindows;
- for (int i = 0; i < baseSize; i++) {
- bob.add(rowSignature.getColumnName(i),
rowSignature.getColumnType(i).get());
+ RowSignature signatureFromInput =
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+ log.info("Row signature received from last stage is [%s].",
signatureFromInput);
+
+ for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
+ bob.add(signatureFromInput.getColumnName(i),
signatureFromInput.getColumnType(i).get());
}
- for (int i = 0; i < numberOfWindows; i++) {
- bob.add(rowSignature.getColumnName(baseSize + i),
rowSignature.getColumnType(baseSize + i).get()).build();
+ List<String> partitionColumnNames = new ArrayList<>();
+
+ /*
+ operatorList is a List<List<OperatorFactory>>, where each
List<OperatorFactory> corresponds to the operator factories
+ to be used for a different window stage.
+
+ We iterate over operatorList, and add the definition for a window stage
to QueryDefinitionBuilder.
+ */
+ for (int i = 0; i < operatorList.size(); i++) {
+ for (OperatorFactory operatorFactory : operatorList.get(i)) {
+ if (operatorFactory instanceof WindowOperatorFactory) {
+ List<String> outputColumnNames = ((WindowOperatorFactory)
operatorFactory).getProcessor().getOutputColumnNames();
+
+ // Need to add column names which are present in outputColumnNames
and rowSignature but not in bob,
+ // since they need to be present in the row signature for this
window stage.
+ for (String columnName : outputColumnNames) {
+ int indexInRowSignature = rowSignature.indexOf(columnName);
+ if (indexInRowSignature != -1 && bob.build().indexOf(columnName)
== -1) {
Review Comment:
In which case the output column name of a particular operator would already
be part of the builder? Wouldn't each operator have a different output column
name?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -177,12 +177,12 @@ public ReturnOrAwait<Object> runIncrementally(IntSet
readableInputs)
*
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
*
- * 1. We are writing 1 partition to each frame in this way. In case of
low cardinality data
+ * 1. We are writing 1 partition to each frame in this way. In case of
high cardinality data
* we will me making a large number of small frames. We can have a
check to keep size of frame to a value
Review Comment:
nit : can correct the typo here as well `we will be`
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -182,12 +244,10 @@ public QueryDefinition makeQueryDefinition(
}
/**
- *
* @param originalQuery
* @param operatorList
- * @return true if the operator List has a partitioning operator with an
empty OVER clause, false otherwise
*/
- private boolean ifEmptyOverPresentInWindowOperstors(
+ private void populateOperatorListFromQuery(
Review Comment:
this could now be `getOperatorListFromQuery` and return the operator list I
guess as we are anyways sending a new array list instance in the variable
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -484,37 +482,36 @@ private void convertRowFrameToRowsAndColumns(Frame frame)
frameRowsAndCols.add(ldrc);
}
- private List<Integer> findPartitionColumns(RowSignature rowSignature)
- {
- List<Integer> indexList = new ArrayList<>();
- for (OperatorFactory of : operatorFactoryList) {
- if (of instanceof NaivePartitioningOperatorFactory) {
- for (String s : ((NaivePartitioningOperatorFactory)
of).getPartitionColumns()) {
- indexList.add(rowSignature.indexOf(s));
- }
- }
- }
- return indexList;
- }
-
/**
- *
- * Compare two rows based only the columns in the partitionIndices
- * In case the parition indices is empty or null compare entire row
- *
+ * Compare two rows based on the columns in partitionColumnNames.
+ * If the partitionColumnNames is empty or null, compare entire row.
+ * <p>
+ * For example, say:
+ * <ul>
+ * <li>partitionColumnNames = ["d1", "d2"]</li>
+ * <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
+ * <li>frameReader.signature.indexOf("d1") = 0</li>
+ * <li>frameReader.signature.indexOf("d2") = 1</li>
+ * <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
+ * <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
+ * </ul>
+ * <p>
+ * Then this method will return true if d1_row1==d1_row2 &&
d2_row1==d2_row2, false otherwise.
+ * Returning true would indicate that these 2 rows can be put into the same
partition for window function processing.
*/
- private boolean comparePartitionKeys(ResultRow row1, ResultRow row2,
List<Integer> partitionIndices)
+ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2,
List<String> partitionColumnNames)
Review Comment:
nit - `comparePartitionKeys` sounds more of comparing (which returns int).
can change the name maybe.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -102,18 +116,18 @@ public QueryDefinition makeQueryDefinition(
final int firstStageNumber = Math.max(minStageNumber,
queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery)
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
final int maxRowsMaterialized;
- RowSignature rowSignature = queryToRun.getRowSignature();
+
if (originalQuery.context() != null &&
originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW))
{
- maxRowsMaterialized = (int) originalQuery.context()
-
.get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
+ maxRowsMaterialized = (int)
originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
} else {
maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW;
}
-
- if (isEmptyOverFound) {
+ if (isEmptyOverPresent) {
// empty over clause found
// moving everything to a single partition
+ // TODO: This logic needs to be revamped and corrected in the future.
+ // This should likely cause issues for cases where we have a mix of
empty over() and non-empty over().
Review Comment:
If this is the case shouldn't we throw an exception stating that we dont
support it currently if we find both empty and non empty windows?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]