kgyrtkirk commented on code in PR #16729:
URL: https://github.com/apache/druid/pull/16729#discussion_r1680626079
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -201,41 +267,46 @@ private boolean ifEmptyOverPresentInWindowOperstors(
operatorFactoryList = new ArrayList<>();
} else if (of instanceof NaivePartitioningOperatorFactory) {
if (((NaivePartitioningOperatorFactory)
of).getPartitionColumns().isEmpty()) {
+ // TODO: This logic need to be revamped in the future. We probably
don't need to handle empty over() cases separately.
Review Comment:
please don't leave `TODO`-s in the final version of the PR;
it's fine to use them while you work on the PR to mark places you intent to
get back (I use the same approach with `FIXME` tags)
If at some point its something you will not address in the actual patch; you
could create an issue and remove the `TODO`.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -201,41 +267,46 @@ private boolean ifEmptyOverPresentInWindowOperstors(
operatorFactoryList = new ArrayList<>();
} else if (of instanceof NaivePartitioningOperatorFactory) {
if (((NaivePartitioningOperatorFactory)
of).getPartitionColumns().isEmpty()) {
+ // TODO: This logic need to be revamped in the future. We probably
don't need to handle empty over() cases separately.
operatorList.clear();
operatorList.add(originalQuery.getOperators());
- return true;
+ return operatorList;
}
}
}
- return false;
+ return operatorList;
}
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory>
operatorFactories, int maxWorkerCount)
{
NaivePartitioningOperatorFactory partition = null;
NaiveSortOperatorFactory sort = null;
- List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (OperatorFactory of : operatorFactories) {
if (of instanceof NaivePartitioningOperatorFactory) {
partition = (NaivePartitioningOperatorFactory) of;
} else if (of instanceof NaiveSortOperatorFactory) {
sort = (NaiveSortOperatorFactory) of;
}
}
- Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
+
+ Map<String, ColumnWithDirection.Direction> sortColumnsMap = new
HashMap<>();
if (sort != null) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
- colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+ sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}
- assert partition != null;
- if (partition.getPartitionColumns().isEmpty()) {
+
+ if (partition == null || partition.getPartitionColumns().isEmpty()) {
+ // If operatorFactories doesn't have any partitioning factory, then we
should keep the shuffle spec from previous stage.
+ // This indicates that we already have the data partitioned correctly,
and hence we don't need to do any shuffling.
return null;
}
+
+ List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
KeyColumn kc;
- if (colMap.containsKey(partitionColumn)) {
- if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
+ if (sortColumnsMap.containsKey(partitionColumn)) {
Review Comment:
I don't think this could be allowed to return false - why there isn't an
exception in the `else` branch?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -201,41 +267,46 @@ private boolean ifEmptyOverPresentInWindowOperstors(
operatorFactoryList = new ArrayList<>();
} else if (of instanceof NaivePartitioningOperatorFactory) {
if (((NaivePartitioningOperatorFactory)
of).getPartitionColumns().isEmpty()) {
+ // TODO: This logic need to be revamped in the future. We probably
don't need to handle empty over() cases separately.
operatorList.clear();
operatorList.add(originalQuery.getOperators());
- return true;
+ return operatorList;
}
}
}
- return false;
+ return operatorList;
}
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory>
operatorFactories, int maxWorkerCount)
{
NaivePartitioningOperatorFactory partition = null;
NaiveSortOperatorFactory sort = null;
- List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (OperatorFactory of : operatorFactories) {
if (of instanceof NaivePartitioningOperatorFactory) {
partition = (NaivePartitioningOperatorFactory) of;
} else if (of instanceof NaiveSortOperatorFactory) {
sort = (NaiveSortOperatorFactory) of;
}
}
- Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
+
+ Map<String, ColumnWithDirection.Direction> sortColumnsMap = new
HashMap<>();
if (sort != null) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
- colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+ sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}
- assert partition != null;
- if (partition.getPartitionColumns().isEmpty()) {
+
+ if (partition == null || partition.getPartitionColumns().isEmpty()) {
+ // If operatorFactories doesn't have any partitioning factory, then we
should keep the shuffle spec from previous stage.
+ // This indicates that we already have the data partitioned correctly,
and hence we don't need to do any shuffling.
Review Comment:
note: if they do - then why would be a need to make another stage?
I guess this will be only true when the 1st operators of the wndquery could
avoid sorting/partitioning because a gby query have already done it for it?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet
readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
- } else if (comparePartitionKeys(outputRow, currentRow,
partitionColsIndex)) {
+ } else if (comparePartitionKeys(outputRow, currentRow,
partitionColumnNames)) {
Review Comment:
I meaned that in the `else` branch there is a call to `runAllOpsOnSingleRac`
which launches to process an operator list - but that gets desctructed after
the frame is processed and a new one is built for the next rac...
as a `rac` in this case could mean even a single row - that makes it a bit
inefficient; as setup/shutdown cost is added to every processed `rac`
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -162,6 +207,28 @@ public QueryDefinition makeQueryDefinition(
);
}
+ log.info("Using row signature [%s] for window stage.",
stageRowSignature);
+
+ boolean partitionOperatorExists = false;
+ List<String> currentPartitionColumns = new ArrayList<>();
+ for (OperatorFactory of : operatorList.get(i)) {
+ if (of instanceof NaivePartitioningOperatorFactory) {
+ for (String s : ((NaivePartitioningOperatorFactory)
of).getPartitionColumns()) {
+ currentPartitionColumns.add(s);
+ partitionOperatorExists = true;
+ }
+ }
+ }
+
+ if (partitionOperatorExists) {
+ partitionColumnNames = currentPartitionColumns;
+ }
+
+ log.info(
+ "Columns which would be used to define partitioning boundaries for
this window stage are [%s]",
+ partitionColumnNames
+ );
Review Comment:
of course!
I would recommend to separate these as much as possible - many small PRs
used to get reviews faster and because of the size them the feedback is usually
also much better!
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -201,41 +267,46 @@ private boolean ifEmptyOverPresentInWindowOperstors(
operatorFactoryList = new ArrayList<>();
} else if (of instanceof NaivePartitioningOperatorFactory) {
if (((NaivePartitioningOperatorFactory)
of).getPartitionColumns().isEmpty()) {
+ // TODO: This logic need to be revamped in the future. We probably
don't need to handle empty over() cases separately.
operatorList.clear();
operatorList.add(originalQuery.getOperators());
- return true;
+ return operatorList;
}
}
}
- return false;
+ return operatorList;
}
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory>
operatorFactories, int maxWorkerCount)
{
NaivePartitioningOperatorFactory partition = null;
NaiveSortOperatorFactory sort = null;
- List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (OperatorFactory of : operatorFactories) {
if (of instanceof NaivePartitioningOperatorFactory) {
partition = (NaivePartitioningOperatorFactory) of;
} else if (of instanceof NaiveSortOperatorFactory) {
sort = (NaiveSortOperatorFactory) of;
}
}
- Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
+
+ Map<String, ColumnWithDirection.Direction> sortColumnsMap = new
HashMap<>();
if (sort != null) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
- colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+ sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}
- assert partition != null;
- if (partition.getPartitionColumns().isEmpty()) {
+
+ if (partition == null || partition.getPartitionColumns().isEmpty()) {
+ // If operatorFactories doesn't have any partitioning factory, then we
should keep the shuffle spec from previous stage.
+ // This indicates that we already have the data partitioned correctly,
and hence we don't need to do any shuffling.
return null;
}
+
+ List<KeyColumn> keyColsOfWindow = new ArrayList<>();
Review Comment:
it seems like there should be a method which translates
`sort.getSortColumns()` to `keyColsOfWindow`
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet
readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
- } else if (comparePartitionKeys(outputRow, currentRow,
partitionColsIndex)) {
+ } else if (comparePartitionKeys(outputRow, currentRow,
partitionColumnNames)) {
Review Comment:
totally agree - I've either missed the review; or more likely I haven't
realized that the above is a possible alternate approach which could work better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]