gianm commented on code in PR #13458: URL: https://github.com/apache/druid/pull/13458#discussion_r1039174947
########## processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.column.RowSignature; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class WindowOperatorQuery extends BaseQuery<RowsAndColumns> Review Comment: Is this meant to be supplanted by an `operator` query at some point? Would both live side by side? ########## processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.column.RowSignature; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class WindowOperatorQuery extends BaseQuery<RowsAndColumns> Review Comment: What are the preconditions for the data from the `dataSource` of this query? Would make good javadoc. ########## processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.column; + +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; + +/** + * Allows for accessing a column, provides methods to enable cell-by-cell access. + */ +public interface ColumnAccessor +{ + /** + * Get the type of the Column + * @return the type of the Column + */ + ColumnType getType(); + + /** + * Get the number of cells + * @return the number of cells + */ + int numCells(); Review Comment: I get why this is called a cell, but it would make more sense to me if it was called a row. ########## processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import org.apache.druid.query.operator.window.Processor; +import org.apache.druid.query.rowsandcols.RowsAndColumns; + +public class WindowProcessorOperator implements Operator Review Comment: What are the expectations for the data coming out of the child operator? (I'm asking for javadoc 🙂) ########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java: ########## @@ -331,15 +331,12 @@ private DataSource inlineIfNecessary( current = Iterables.getOnlyElement(current.getChildren()); } - assert !(current instanceof QueryDataSource); // lgtm [java/contradictory-type-checks] Review Comment: Can you change these to non-assert checks instead of deleting them? This isn't perf-sensitive code. ########## processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.rowsandcols.DefaultSortedGroupPartitioner; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.SortedGroupPartitioner; + +import java.util.Iterator; +import java.util.List; + +/** + * This naive partitioning operator assumes that it's child operator always gives it RowsAndColumns objects that are + * a superset of the partitions that it needs to provide. It will never attempt to make a partition larger than a + * single RowsAndColumns object that it is given from its child Operator. A different operator should be used + * if that is an important bit of functionality to have. Review Comment: Any preconditions? Do things need to be sorted any particular way? ########## sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java: ########## @@ -58,24 +59,28 @@ private final Project aggregateProject; private final Sort sort; private final Project sortProject; + private final Window window; public enum Stage { // SCAN must be present on all queries. SCAN, - // WHERE_FILTER, SELECT_PROJECT may be present on any query. + // WHERE_FILTER, SELECT_PROJECT may be present on any query, except ones with WINDOW. WHERE_FILTER, SELECT_PROJECT, - // AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on aggregating queries. + // AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on non-WINDOW aggregating queries. Review Comment: HAVING_FILTER (spelling) ########## sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java: ########## @@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery( sorting = null; } + if (partialQuery.getWindow() != null) { + final QueryContext queryContext = plannerContext.queryContext(); + if (queryContext.getBoolean("windowsAreForClosers", false)) { + windowing = Preconditions.checkNotNull( + Windowing.fromCalciteStuff( + partialQuery, + plannerContext, + sourceRowSignature, // TODO(gianm): window can only apply to the source data, because SCAN -> WINDOW + rexBuilder + ) + ); + } else { + plannerContext.setPlanningError("Windowing Not Currently Supported"); Review Comment: I notice that this error message does not actually get shown. That's a bummer and I think it has to do with the fact that this planning error system, which was added in #11911, surfaces errors relating to some path that was explored, but may not be the most intuitive path to pull the error from. There is a legit reason for this: sometimes a query that isn't plannable in one path _will_ be plannable in another path. So we don't always want to fail fast. I think the real answer here is to fail in the validation stage. By the time this code is running, we're in the optimization stage, and Calcite assumes that by the time a query gets to optimization, it's going to be able to run. That's why failing during optimization causes it to go so bonkers. We do have custom validation logic we added for INSERT and REPLACE; perhaps a similar approach could work for validating the kinds of window functions (and joins, too) that we support. No need to do it in this patch, since this patch isn't really making things any worse or better in this regard. But it would be a nice future change. ########## processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.column.RowSignature; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class WindowOperatorQuery extends BaseQuery<RowsAndColumns> +{ + private final RowSignature rowSignature; + private final List<OperatorFactory> operators; + + @JsonCreator + public WindowOperatorQuery( + @JsonProperty("dataSource") DataSource dataSource, + @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, Review Comment: I am reminded that the equivalent MSQ concept here is `InputSpec`, where the `TableInputSpec` combines datasource, intervals, and filter. Wonder if it would make sense to use that here. ########## core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java: ########## @@ -55,8 +55,15 @@ * Implementations of this interface should be thread safe, but may not use {@link ByteBuffer} in a thread safe manner, * potentially modifying positions and limits, either temporarily or permanently depending on which set of methods is * called. + * + * This interface extends {@code Comparator<Object>} instead of {@code Comparator<T>} because trying to specialize the + * type of the comparison method can run into issues for comparators of objects that can sometimes be of a different + * java class type. For example, {@code Comparator<Long>} cannot accept Integer objects in its comparison method + * and there is no easy way for this interface definition to allow {@code TypeStrategy<Long>} to actually be a + * {@code Comparator<Number>}. So, we fall back to effectively erasing the generic type and having them all be + * {@code Comparator<Object>}. */ -public interface TypeStrategy<T> extends Comparator<T> +public interface TypeStrategy<T> extends Comparator<Object> Review Comment: I get why you did it this way, but it's a bummer, since nothing really stops the inexorable Objectification of the Ts in this interface, except moving away from Jackson serde of plain `Object` and `Object[]` and `List<Object>`. That's the real issue. I think we'll be able to move away from that as we go to frame and frame channel RPC, so, I am okay with this for that reason. ########## sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java: ########## @@ -86,10 +87,16 @@ public static List<RelOptRule> rules(PlannerContext plannerContext) PartialDruidQuery.Stage.SORT_PROJECT, PartialDruidQuery::withSortProject ), + new DruidQueryRule<>( Review Comment: Could you adjust things so this rule isn't added unless `windowsAreForClosers: true`? Will limit the impact if it turns out that something is wrong with the rule. ########## processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.column.RowSignature; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class WindowOperatorQuery extends BaseQuery<RowsAndColumns> +{ + private final RowSignature rowSignature; + private final List<OperatorFactory> operators; + + @JsonCreator + public WindowOperatorQuery( + @JsonProperty("dataSource") DataSource dataSource, + @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, Review Comment: Is it too wild to consider combining DataSource and QuerySegmentSpec as part of this effort? Once I really internalized what they each mean it feels like it's weird that they're separate. ########## processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.rowsandcols.ArrayListRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; + +/** + * A {@link Segment} that is based on a stream of objects. + */ +public class ArrayListSegment<RowType> implements Segment Review Comment: If you wanted to, you could merge this into `RowBasedSegment`, since you can get the underlying ArrayList out of the Sequence. The code looks something like this. ``` if (rows instanceof SimpleSequence) { final Iterable<RowType> rowIterable = ((SimpleSequence<RowType>) rows).getIterable(); if (rowIterable instanceof ArrayList) { // do cool stuff } } ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java: ########## @@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery( sorting = null; } + if (partialQuery.getWindow() != null) { + final QueryContext queryContext = plannerContext.queryContext(); + if (queryContext.getBoolean("windowsAreForClosers", false)) { + windowing = Preconditions.checkNotNull( + Windowing.fromCalciteStuff( + partialQuery, + plannerContext, + sourceRowSignature, // TODO(gianm): window can only apply to the source data, because SCAN -> WINDOW Review Comment: I think this would be fine as a regular comment. It's saying that it's ok to use the `sourceRowSignature` here, because PartialDruidQuery (which enforces ordering of operators) only allows WINDOW to apply on top of SCAN. And SCAN is where the `sourceRowSignature` comes from. ########## processing/src/main/java/org/apache/druid/query/InlineDataSource.java: ########## @@ -200,6 +201,11 @@ public Iterable<Object[]> getRows() return rows; } + public boolean rowsAreArrayList() Review Comment: ArrayList is better than List? ########## sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java: ########## @@ -870,6 +911,12 @@ public Query<?> getQuery() */ private Query<?> computeQuery() { + // TODO(gianm): structure Review Comment: I think with this comment I was thinking that the code could be structured better. But that was a while ago. I'm over it. ########## sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java: ########## @@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery( sorting = null; } + if (partialQuery.getWindow() != null) { + final QueryContext queryContext = plannerContext.queryContext(); + if (queryContext.getBoolean("windowsAreForClosers", false)) { + windowing = Preconditions.checkNotNull( + Windowing.fromCalciteStuff( + partialQuery, + plannerContext, + sourceRowSignature, // TODO(gianm): window can only apply to the source data, because SCAN -> WINDOW + rexBuilder + ) + ); + } else { + plannerContext.setPlanningError("Windowing Not Currently Supported"); Review Comment: People that try to do window functions should ideally see a nicer message. How about something like: > Window functions (OVER) are not supported ########## sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java: ########## @@ -93,7 +93,12 @@ public static RexNode fromFieldAccess( ) { if (project == null) { - // I don't think the factory impl matters here. + // Gian doesn't think the factory impl matters here, he's likely correct. But, upon reading what this is doing, + // we are re-building the list of things in the RelDataType for every single call to `fromFieldAccess`. + // `fromFieldAccess` is called pretty regularly in pretty low-level areas of the code, so it would make sense + // that we are perhaps re-creating the exact same object over and over and over and over again and wasting CPU Review Comment: Re-doing the same work over and over is something of a theme in the way the planning code is structured. Definitely would be good to go through all that and improve it. ########## sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java: ########## @@ -258,13 +273,33 @@ public static DruidQuery fromPartialQuery( sorting = null; } + if (partialQuery.getWindow() != null) { + final QueryContext queryContext = plannerContext.queryContext(); + if (queryContext.getBoolean("windowsAreForClosers", false)) { Review Comment: ``` public static final CTX_ENABLE_WINDOW_FUNCTIONS = "windowsAreForClosers"; ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java: ########## @@ -86,10 +87,16 @@ public static List<RelOptRule> rules(PlannerContext plannerContext) PartialDruidQuery.Stage.SORT_PROJECT, PartialDruidQuery::withSortProject ), + new DruidQueryRule<>( + Window.class, + PartialDruidQuery.Stage.WINDOW, + PartialDruidQuery::withWindow + ), DruidOuterQueryRule.AGGREGATE, DruidOuterQueryRule.WHERE_FILTER, DruidOuterQueryRule.SELECT_PROJECT, DruidOuterQueryRule.SORT, + DruidOuterQueryRule.WINDOW, Review Comment: Same with this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
