gianm commented on code in PR #13902:
URL: https://github.com/apache/druid/pull/13902#discussion_r1130205214
##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java:
##########
@@ -396,4 +434,59 @@ public int getConstantInt(int argPosition)
return ((Number) getConstantArgument(argPosition).getValue()).intValue();
}
}
+
+ /**
+ * Return a list of {@link ColumnWithDirection} corresponding to a {@link
RelCollation}.
+ *
+ * @param collation collation
+ * @param sourceRowSignature signature of the collated rows
+ */
+ private static LinkedHashSet<ColumnWithDirection>
computeSortColumnsFromRelCollation(
+ final RelCollation collation,
+ final RowSignature sourceRowSignature
+ )
+ {
+ final LinkedHashSet<ColumnWithDirection> retVal = new LinkedHashSet<>();
+
+ for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
+ ColumnWithDirection.Direction direction = null;
+
+ switch (fieldCollation.getDirection()) {
+ case ASCENDING:
+ case STRICTLY_ASCENDING:
+ direction = ColumnWithDirection.Direction.ASC;
+ break;
+
+ case DESCENDING:
+ case STRICTLY_DESCENDING:
+ direction = ColumnWithDirection.Direction.DESC;
+ break;
+ }
+
+ if (direction != null) {
+ final ColumnWithDirection columnWithDirection = new
ColumnWithDirection(
+ sourceRowSignature.getColumnName(fieldCollation.getFieldIndex()),
+ direction
+ );
+
+ retVal.add(columnWithDirection);
+ } else {
+ break;
+ }
+ }
+
+ return retVal;
+ }
+
+ /**
+ * Whether currentSort is a prefix of priorSort. (i.e., whether data sorted
by priorSort is *also* sorted
+ * by currentSort.)
+ */
+ private static boolean sortMatches(
+ final List<ColumnWithDirection> priorSort,
+ final List<ColumnWithDirection> currentSort
+ )
+ {
+ return currentSort.size() <= priorSort.size() &&
currentSort.equals(priorSort.subList(0, currentSort.size()));
+ }
Review Comment:
Sure it's simpler, it's only one line vs, like, 5 😛
Anyway, I changed it.
##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java:
##########
@@ -109,53 +113,67 @@
public static Windowing fromCalciteStuff(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
- final RowSignature rowSignature,
+ final RowSignature sourceRowSignature,
final RexBuilder rexBuilder
)
{
final Window window = Preconditions.checkNotNull(partialQuery.getWindow(),
"window");
ArrayList<OperatorFactory> ops = new ArrayList<>();
- final List<String> expectedOutputColumns = new
ArrayList<>(rowSignature.getColumnNames());
- final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w",
rowSignature.getColumnNames());
+ final List<String> windowOutputColumns = new
ArrayList<>(sourceRowSignature.getColumnNames());
+ final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w",
sourceRowSignature.getColumnNames());
int outputNameCounter = 0;
+
+ // Track prior partition columns and sort columns group-to-group, so we
only insert sorts and repartitions if
+ // we really need to.
+ List<String> priorPartitionColumns = null;
+ LinkedHashSet<ColumnWithDirection> priorSortColumns = new
LinkedHashSet<>();
+
+ final RelCollation priorCollation =
partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+ if (priorCollation != null) {
+ // Populate initial priorSortColumns using collation of the input to the
window operation. Allows us to skip
+ // the initial sort operator if the rows were already in the desired
order.
+ priorSortColumns = computeSortColumnsFromRelCollation(priorCollation,
sourceRowSignature);
+ }
+
for (int i = 0; i < window.groups.size(); ++i) {
- final WindowGroup group = new WindowGroup(window, window.groups.get(i),
rowSignature);
+ final WindowGroup group = new WindowGroup(window, window.groups.get(i),
sourceRowSignature);
- if (i > 0) {
- LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
- for (String partitionColumn : group.getPartitionColumns()) {
- sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
- }
- sortColumns.addAll(group.getOrdering());
+ final LinkedHashSet<ColumnWithDirection> sortColumns = new
LinkedHashSet<>();
+ for (String partitionColumn : group.getPartitionColumns()) {
+ sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
+ }
+ sortColumns.addAll(group.getOrdering());
+ // Add sorting and partitioning if needed.
+ if (!sortMatches(ImmutableList.copyOf(priorSortColumns),
ImmutableList.copyOf(sortColumns))) {
Review Comment:
I changed it to use iterators.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]