gianm commented on code in PR #13458:
URL: https://github.com/apache/druid/pull/13458#discussion_r1039174947


##########
processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class WindowOperatorQuery extends BaseQuery<RowsAndColumns>

Review Comment:
   Is this meant to be supplanted by an `operator` query at some point? Would 
both live side by side?



##########
processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class WindowOperatorQuery extends BaseQuery<RowsAndColumns>

Review Comment:
   What are the preconditions for the data from the `dataSource` of this query? 
Would make good javadoc.



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.column;
+
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+
+/**
+ * Allows for accessing a column, provides methods to enable cell-by-cell 
access.
+ */
+public interface ColumnAccessor
+{
+  /**
+   * Get the type of the Column
+   * @return the type of the Column
+   */
+  ColumnType getType();
+
+  /**
+   * Get the number of cells
+   * @return the number of cells
+   */
+  int numCells();

Review Comment:
   I get why this is called a cell, but it would make more sense to me if it 
was called a row.



##########
processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.query.operator.window.Processor;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+
+public class WindowProcessorOperator implements Operator

Review Comment:
   What are the expectations for the data coming out of the child operator? 
(I'm asking for javadoc 🙂) 



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -331,15 +331,12 @@ private DataSource inlineIfNecessary(
           current = Iterables.getOnlyElement(current.getChildren());
         }
 
-        assert !(current instanceof QueryDataSource); // lgtm 
[java/contradictory-type-checks]

Review Comment:
   Can you change these to non-assert checks instead of deleting them? This 
isn't perf-sensitive code.



##########
processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.DefaultSortedGroupPartitioner;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.SortedGroupPartitioner;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This naive partitioning operator assumes that it's child operator always 
gives it RowsAndColumns objects that are
+ * a superset of the partitions that it needs to provide.  It will never 
attempt to make a partition larger than a
+ * single RowsAndColumns object that it is given from its child Operator.  A 
different operator should be used
+ * if that is an important bit of functionality to have.

Review Comment:
   Any preconditions? Do things need to be sorted any particular way?



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java:
##########
@@ -58,24 +59,28 @@
   private final Project aggregateProject;
   private final Sort sort;
   private final Project sortProject;
+  private final Window window;
 
   public enum Stage
   {
     // SCAN must be present on all queries.
     SCAN,
 
-    // WHERE_FILTER, SELECT_PROJECT may be present on any query.
+    // WHERE_FILTER, SELECT_PROJECT may be present on any query, except ones 
with WINDOW.
     WHERE_FILTER,
     SELECT_PROJECT,
 
-    // AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on 
aggregating queries.
+    // AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on 
non-WINDOW aggregating queries.

Review Comment:
   HAVING_FILTER (spelling)



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery(
       sorting = null;
     }
 
+    if (partialQuery.getWindow() != null) {
+      final QueryContext queryContext = plannerContext.queryContext();
+      if (queryContext.getBoolean("windowsAreForClosers", false)) {
+        windowing = Preconditions.checkNotNull(
+            Windowing.fromCalciteStuff(
+                partialQuery,
+                plannerContext,
+                sourceRowSignature, // TODO(gianm): window can only apply to 
the source data, because SCAN -> WINDOW
+                rexBuilder
+            )
+        );
+      } else {
+        plannerContext.setPlanningError("Windowing Not Currently Supported");

Review Comment:
   I notice that this error message does not actually get shown. That's a 
bummer and I think it has to do with the fact that this planning error system, 
which was added in #11911, surfaces errors relating to some path that was 
explored, but may not be the most intuitive path to pull the error from.
   
   There is a legit reason for this: sometimes a query that isn't plannable in 
one path _will_ be plannable in another path. So we don't always want to fail 
fast.
   
   I think the real answer here is to fail in the validation stage. By the time 
this code is running, we're in the optimization stage, and Calcite assumes that 
by the time a query gets to optimization, it's going to be able to run. That's 
why failing during optimization causes it to go so bonkers. We do have custom 
validation logic we added for INSERT and REPLACE; perhaps a similar approach 
could work for validating the kinds of window functions (and joins, too) that 
we support. No need to do it in this patch, since this patch isn't really 
making things any worse or better in this regard. But it would be a nice future 
change.



##########
processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class WindowOperatorQuery extends BaseQuery<RowsAndColumns>
+{
+  private final RowSignature rowSignature;
+  private final List<OperatorFactory> operators;
+
+  @JsonCreator
+  public WindowOperatorQuery(
+      @JsonProperty("dataSource") DataSource dataSource,
+      @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,

Review Comment:
   I am reminded that the equivalent MSQ concept here is `InputSpec`, where the 
`TableInputSpec` combines datasource, intervals, and filter. Wonder if it would 
make sense to use that here.



##########
core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java:
##########
@@ -55,8 +55,15 @@
  * Implementations of this interface should be thread safe, but may not use 
{@link ByteBuffer} in a thread safe manner,
  * potentially modifying positions and limits, either temporarily or 
permanently depending on which set of methods is
  * called.
+ *
+ * This interface extends {@code Comparator<Object>} instead of {@code 
Comparator<T>} because trying to specialize the
+ * type of the comparison method can run into issues for comparators of 
objects that can sometimes be of a different
+ * java class type.  For example, {@code Comparator<Long>} cannot accept 
Integer objects in its comparison method
+ * and there is no easy way for this interface definition to allow {@code 
TypeStrategy<Long>} to actually be a
+ * {@code Comparator<Number>}.  So, we fall back to effectively erasing the 
generic type and having them all be
+ * {@code Comparator<Object>}.
  */
-public interface TypeStrategy<T> extends Comparator<T>
+public interface TypeStrategy<T> extends Comparator<Object>

Review Comment:
   I get why you did it this way, but it's a bummer, since nothing really stops 
the inexorable Objectification of the Ts in this interface, except moving away 
from Jackson serde of plain `Object` and `Object[]` and `List<Object>`. That's 
the real issue. I think we'll be able to move away from that as we go to frame 
and frame channel RPC, so, I am okay with this for that reason.



##########
sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java:
##########
@@ -86,10 +87,16 @@ public static List<RelOptRule> rules(PlannerContext 
plannerContext)
             PartialDruidQuery.Stage.SORT_PROJECT,
             PartialDruidQuery::withSortProject
         ),
+        new DruidQueryRule<>(

Review Comment:
   Could you adjust things so this rule isn't added unless 
`windowsAreForClosers: true`? Will limit the impact if it turns out that 
something is wrong with the rule.



##########
processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class WindowOperatorQuery extends BaseQuery<RowsAndColumns>
+{
+  private final RowSignature rowSignature;
+  private final List<OperatorFactory> operators;
+
+  @JsonCreator
+  public WindowOperatorQuery(
+      @JsonProperty("dataSource") DataSource dataSource,
+      @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,

Review Comment:
   Is it too wild to consider combining DataSource and QuerySegmentSpec as part 
of this effort? Once I really internalized what they each mean it feels like 
it's weird that they're separate.



##########
processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.rowsandcols.ArrayListRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+
+/**
+ * A {@link Segment} that is based on a stream of objects.
+ */
+public class ArrayListSegment<RowType> implements Segment

Review Comment:
   If you wanted to, you could merge this into `RowBasedSegment`, since you can 
get the underlying ArrayList out of the Sequence. The code looks something like 
this.
   
   ```
       if (rows instanceof SimpleSequence) {
         final Iterable<RowType> rowIterable = ((SimpleSequence<RowType>) 
rows).getIterable();
   
         if (rowIterable instanceof ArrayList) {
           // do cool stuff
         }
       }
   ```



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery(
       sorting = null;
     }
 
+    if (partialQuery.getWindow() != null) {
+      final QueryContext queryContext = plannerContext.queryContext();
+      if (queryContext.getBoolean("windowsAreForClosers", false)) {
+        windowing = Preconditions.checkNotNull(
+            Windowing.fromCalciteStuff(
+                partialQuery,
+                plannerContext,
+                sourceRowSignature, // TODO(gianm): window can only apply to 
the source data, because SCAN -> WINDOW

Review Comment:
   I think this would be fine as a regular comment. It's saying that it's ok to 
use the `sourceRowSignature` here, because PartialDruidQuery (which enforces 
ordering of operators) only allows WINDOW to apply on top of SCAN. And SCAN is 
where the `sourceRowSignature` comes from.



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -200,6 +201,11 @@ public Iterable<Object[]> getRows()
     return rows;
   }
 
+  public boolean rowsAreArrayList()

Review Comment:
   ArrayList is better than List?



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -870,6 +911,12 @@ public Query<?> getQuery()
    */
   private Query<?> computeQuery()
   {
+    // TODO(gianm): structure

Review Comment:
   I think with this comment I was thinking that the code could be structured 
better. But that was a while ago. I'm over it.



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery(
       sorting = null;
     }
 
+    if (partialQuery.getWindow() != null) {
+      final QueryContext queryContext = plannerContext.queryContext();
+      if (queryContext.getBoolean("windowsAreForClosers", false)) {
+        windowing = Preconditions.checkNotNull(
+            Windowing.fromCalciteStuff(
+                partialQuery,
+                plannerContext,
+                sourceRowSignature, // TODO(gianm): window can only apply to 
the source data, because SCAN -> WINDOW
+                rexBuilder
+            )
+        );
+      } else {
+        plannerContext.setPlanningError("Windowing Not Currently Supported");

Review Comment:
   People that try to do window functions should ideally see a nicer message. 
How about something like:
   
   > Window functions (OVER) are not supported



##########
sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java:
##########
@@ -93,7 +93,12 @@ public static RexNode fromFieldAccess(
   )
   {
     if (project == null) {
-      // I don't think the factory impl matters here.
+      // Gian doesn't think the factory impl matters here, he's likely 
correct.  But, upon reading what this is doing,
+      // we are re-building the list of things in the RelDataType for every 
single call to `fromFieldAccess`.
+      // `fromFieldAccess` is called pretty regularly in pretty low-level 
areas of the code, so it would make sense
+      // that we are perhaps re-creating the exact same object over and over 
and over and over again and wasting CPU

Review Comment:
   Re-doing the same work over and over is something of a theme in the way the 
planning code is structured. Definitely would be good to go through all that 
and improve it.



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery(
       sorting = null;
     }
 
+    if (partialQuery.getWindow() != null) {
+      final QueryContext queryContext = plannerContext.queryContext();
+      if (queryContext.getBoolean("windowsAreForClosers", false)) {

Review Comment:
   ```
   public static final CTX_ENABLE_WINDOW_FUNCTIONS = "windowsAreForClosers";
   ```



##########
sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java:
##########
@@ -86,10 +87,16 @@ public static List<RelOptRule> rules(PlannerContext 
plannerContext)
             PartialDruidQuery.Stage.SORT_PROJECT,
             PartialDruidQuery::withSortProject
         ),
+        new DruidQueryRule<>(
+            Window.class,
+            PartialDruidQuery.Stage.WINDOW,
+            PartialDruidQuery::withWindow
+        ),
         DruidOuterQueryRule.AGGREGATE,
         DruidOuterQueryRule.WHERE_FILTER,
         DruidOuterQueryRule.SELECT_PROJECT,
         DruidOuterQueryRule.SORT,
+        DruidOuterQueryRule.WINDOW,

Review Comment:
   Same with this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to