clintropolis commented on code in PR #13458:
URL: https://github.com/apache/druid/pull/13458#discussion_r1040349294


##########
processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.column.RowSignature;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+/**
+ * Tests the WindowOperatorQuery, it would actually be a lot better to run 
this through some tests that actually
+ * validate the operation of queries, but all of the efforts to build out test 
scaffolding and framework have gone
+ * into building things out for SQL query operations.  As such, all of the 
tests that validating the actual native
+ * functionality actually run from the `druid-sql` module instead of this 
module.  It would be best to de-couple
+ * these and have all of the native, query processing tests happen directly 
here in processing and have the SQL
+ * tests only concern themselves with how they plan SQL into Native, but 
that's a bit big of a nugget to bite off
+ * at this point in time, so instead we continue the building of technical 
debt by making this "test" run lines
+ * of code without actually testing much meaningful behavior.
+ * <p>
+ * For now, view CalciteWindowQueryTest for actual tests that validate 
behavior.
+ */
+public class WindowOperatorQueryTest
+{
+  WindowOperatorQuery query;
+
+  @Before
+  public void setUp()
+  {
+    query = new WindowOperatorQuery(
+        InlineDataSource.fromIterable(new ArrayList<>(), RowSignature.empty()),
+        ImmutableMap.of("sally", "sue"),
+        RowSignature.empty(),
+        new ArrayList<>()
+    );
+  }
+

Review Comment:
   could you add a json serde test (unless i missed it somewhere) in follow-up 
PR?



##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowAggregateProcessor.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.window;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
+import org.apache.druid.query.rowsandcols.DefaultOnHeapAggregatable;
+import org.apache.druid.query.rowsandcols.OnHeapAggregatable;
+import org.apache.druid.query.rowsandcols.OnHeapCumulativeAggregatable;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn;
+import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class WindowAggregateProcessor implements Processor
+{
+  @Nullable
+  private static <T> List<T> emptyToNull(List<T> list)
+  {
+    if (list == null || list.isEmpty()) {
+      return null;
+    } else {
+      return list;
+    }
+  }
+
+  private final List<AggregatorFactory> aggregations;
+  private final List<AggregatorFactory> cumulativeAggregations;
+
+  @JsonCreator
+  public WindowAggregateProcessor(
+      @JsonProperty("aggregations") List<AggregatorFactory> aggregations,
+      @JsonProperty("cumulativeAggregations") List<AggregatorFactory> 
cumulativeAggregations
+  )
+  {
+    this.aggregations = emptyToNull(aggregations);
+    this.cumulativeAggregations = emptyToNull(cumulativeAggregations);
+  }
+
+  @JsonProperty("aggregations")
+  public List<AggregatorFactory> getAggregations()
+  {
+    return aggregations;
+  }
+
+  @JsonProperty("cumulativeAggregations")
+  public List<AggregatorFactory> getCumulativeAggregations()
+  {
+    return cumulativeAggregations;
+  }
+
+  @Override
+  public RowsAndColumns process(RowsAndColumns inputPartition)
+  {
+    AppendableRowsAndColumns retVal = 
RowsAndColumns.expectAppendable(inputPartition);
+
+    if (aggregations != null) {
+      OnHeapAggregatable aggregatable = 
inputPartition.as(OnHeapAggregatable.class);

Review Comment:
   is this a stub? (none of the `RowsAndColumns` implementations seem to 
implement this...)



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.column;
+
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+
+/**
+ * Allows for accessing a column, provides methods to enable cell-by-cell 
access.
+ */
+public interface ColumnAccessor
+{
+  /**
+   * Get the type of the Column
+   *
+   * @return the type of the Column
+   */
+  ColumnType getType();
+
+  /**
+   * Get the number of cells
+   *
+   * @return the number of cells
+   */
+  int numRows();
+
+  /**
+   * Get whether the value of a cell is null
+   *
+   * @param rowNum the cell id, 0-indexed
+   * @return true if the value is null
+   */
+  boolean isNull(int rowNum);
+
+  /**
+   * Get the {@link Object} representation of the cell.
+   *
+   * @param rowNum the cell id, 0-indexed
+   * @return the {@link Object} representation of the cell.  Returns {@code 
null} If {@link #isNull} is true.
+   */
+  @Nullable
+  Object getObject(int rowNum);
+
+  /**
+   * Get the primitive {@code double} representation of the cell.
+   *
+   * @param rowNum the cell id, 0-indexed
+   * @return the primitive {@code double} representation of the cell.  Returns 
{@code 0D} If {@link #isNull} is true.
+   */
+  double getDouble(int rowNum);
+
+  /**
+   * Get the primitive {@code float} representation of the cell.
+   *
+   * @param rowNum the cell id, 0-indexed
+   * @return the primitive {@code float} representation of the cell.  Returns 
{@code 0F} If {@link #isNull} is true.
+   */
+  float getFloat(int rowNum);
+
+  /**
+   * Get the primitive {@code long} representation of the cell.
+   *
+   * @param rowNum the cell id, 0-indexed
+   * @return the primitive {@code long} representation of the cell.  Returns 
{@code 0L} If {@link #isNull} is true.
+   */
+  long getLong(int rowNum);
+
+  /**
+   * Get the primitive {@code int} representation of the cell.
+   *
+   * @param rowNum the cell id, 0-indexed
+   * @return the primitive {@code int} representation of the cell.  Returns 
{@code 0} If {@link #isNull} is true.
+   */
+  int getInt(int rowNum);

Review Comment:
   what are the plans for this? when should it be used? or is it just for the 
row number thing?



##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowAggregateProcessor.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.window;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
+import org.apache.druid.query.rowsandcols.DefaultOnHeapAggregatable;
+import org.apache.druid.query.rowsandcols.OnHeapAggregatable;
+import org.apache.druid.query.rowsandcols.OnHeapCumulativeAggregatable;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn;
+import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class WindowAggregateProcessor implements Processor
+{
+  @Nullable
+  private static <T> List<T> emptyToNull(List<T> list)
+  {
+    if (list == null || list.isEmpty()) {
+      return null;
+    } else {
+      return list;
+    }
+  }
+
+  private final List<AggregatorFactory> aggregations;
+  private final List<AggregatorFactory> cumulativeAggregations;
+
+  @JsonCreator
+  public WindowAggregateProcessor(
+      @JsonProperty("aggregations") List<AggregatorFactory> aggregations,
+      @JsonProperty("cumulativeAggregations") List<AggregatorFactory> 
cumulativeAggregations
+  )
+  {
+    this.aggregations = emptyToNull(aggregations);
+    this.cumulativeAggregations = emptyToNull(cumulativeAggregations);
+  }
+
+  @JsonProperty("aggregations")
+  public List<AggregatorFactory> getAggregations()
+  {
+    return aggregations;
+  }
+
+  @JsonProperty("cumulativeAggregations")
+  public List<AggregatorFactory> getCumulativeAggregations()
+  {
+    return cumulativeAggregations;
+  }
+
+  @Override
+  public RowsAndColumns process(RowsAndColumns inputPartition)
+  {
+    AppendableRowsAndColumns retVal = 
RowsAndColumns.expectAppendable(inputPartition);
+
+    if (aggregations != null) {
+      OnHeapAggregatable aggregatable = 
inputPartition.as(OnHeapAggregatable.class);
+      if (aggregatable == null) {
+        aggregatable = new DefaultOnHeapAggregatable(inputPartition);
+      }
+      final ArrayList<Object> aggregatedVals = 
aggregatable.aggregateAll(aggregations);
+
+      for (int i = 0; i < aggregations.size(); ++i) {
+        final AggregatorFactory agg = aggregations.get(i);
+        retVal.addColumn(
+            agg.getName(),
+            new ConstantObjectColumn(aggregatedVals.get(i), 
inputPartition.numRows(), agg.getResultType())
+        );
+      }
+    }
+
+    if (cumulativeAggregations != null) {
+      OnHeapCumulativeAggregatable cummulativeAgg = 
inputPartition.as(OnHeapCumulativeAggregatable.class);

Review Comment:
   same comment about stub



##########
processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.window.ranking;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.query.operator.window.Processor;
+import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
+import org.apache.druid.query.rowsandcols.DefaultSortedGroupPartitioner;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.SortedGroupPartitioner;
+import org.apache.druid.query.rowsandcols.column.Column;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * This Processor assumes that data has already been sorted for it.  It does 
not re-sort the data and if it is given
+ * data that is not in the correct sorted order, its operation is undefined.
+ */
+public abstract class WindowRankingProcessorBase implements Processor
+{
+  private final List<String> groupingCols;
+  private final String outputColumn;
+
+  public WindowRankingProcessorBase(
+      List<String> groupingCols,
+      String outputColumn
+  )
+  {
+    this.groupingCols = groupingCols;
+    this.outputColumn = outputColumn;
+  }
+
+  @JsonProperty("group")
+  public List<String> getGroupingCols()
+  {
+    return groupingCols;
+  }
+
+  @JsonProperty("outputColumn")
+  public String getOutputColumn()
+  {
+    return outputColumn;
+  }
+
+  public RowsAndColumns processInternal(
+      RowsAndColumns incomingPartition,
+      Function<int[], Column> fn
+  )
+  {
+    final AppendableRowsAndColumns retVal = 
RowsAndColumns.expectAppendable(incomingPartition);
+
+    SortedGroupPartitioner groupPartitioner = 
incomingPartition.as(SortedGroupPartitioner.class);

Review Comment:
   same question re: stub



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to