kgyrtkirk commented on code in PR #16729:
URL: https://github.com/apache/druid/pull/16729#discussion_r1679434719


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -270,7 +269,6 @@ public ReturnOrAwait<Object> runIncrementally(IntSet 
readableInputs)
             ));
           }
           objectsOfASingleRac.add(currentRow);

Review Comment:
   the above `if` check should be after this line; and the 10 lines below one 
should be removed



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet 
readableInputs)
         if (outputRow == null) {
           outputRow = currentRow;
           objectsOfASingleRac.add(currentRow);
-        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColsIndex)) {
+        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColumnNames)) {

Review Comment:
   this is a little bit confusing with that `runAllOpsOnSingleRac` method;  I 
believe the operators should only be run once...and not construct all of them 
for every RAC
   
   what happens here seems to be quite similar to what the 
`NaivePartitioningOperator` does - but in a streaming fashion...
   I think it would be better to implement this as an operator - that way the 
`partitionColumnNames` could also live inside the operators - and not need a 
different path to get passed.
   
   but since this is a bug fix pr - this might be out of scope...



##########
sql/src/test/java/org/apache/druid/sql/calcite/WindowQueryTestBase.java:
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteStreams;
+import com.google.inject.Injector;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.TimestampParser;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.sql.calcite.planner.PlannerCaptureHook;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.util.SqlTestFramework;
+import org.apache.druid.sql.calcite.util.TestDataBuilder;
+import org.joda.time.DateTime;
+import org.joda.time.LocalTime;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
[email protected](WindowQueryTestBase.DrillComponentSupplier.class)
+public abstract class WindowQueryTestBase extends BaseCalciteQueryTest
+{
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  @RegisterExtension
+  private final DisableUnless.DisableUnlessRule disableWhenNonSqlCompat = 
DisableUnless.SQL_COMPATIBLE;
+
+  @RegisterExtension
+  private final NotYetSupported.NotYetSupportedProcessor ignoreProcessor = new 
NotYetSupported.NotYetSupportedProcessor();
+
+  @RegisterExtension
+  protected TestCaseLoaderRule testCaseLoaderRule;
+
+  protected static class WindowTestCase
+  {
+    protected final String query;
+    protected final List<String[]> results;
+    protected String filename;
+    protected String resourcePath;
+
+    protected WindowTestCase(String filename, String resourcePath)
+    {
+      try {
+        this.filename = filename;
+        this.resourcePath = resourcePath;
+        this.query = readStringFromResource(".q");
+        String resultsStr = readStringFromResource(".e");
+        String[] lines = resultsStr.split("\n");
+        results = new ArrayList<>();
+        if (!resultsStr.isEmpty()) {
+          for (String string : lines) {
+            String[] cols = string.split("\t");
+            results.add(cols);
+          }
+        }
+      }
+      catch (Exception e) {
+        throw new RuntimeException(
+            String.format(Locale.ENGLISH, "Encountered exception while loading 
testcase [%s]", filename),
+            e
+        );
+      }
+    }
+
+    @Nonnull
+    protected String getQueryString()
+    {
+      return query;
+    }
+
+    @Nonnull
+    protected List<String[]> getExpectedResults()
+    {
+      return results;
+    }
+
+    @Nonnull
+    protected String readStringFromResource(String s) throws IOException
+    {
+      final String query;
+      try (InputStream queryIn = 
ClassLoader.getSystemResourceAsStream(resourcePath + filename + s)) {
+        query = new String(ByteStreams.toByteArray(queryIn), 
StandardCharsets.UTF_8);
+      }
+      return query;
+    }
+  }
+
+  protected abstract static class TestCaseLoaderRule implements 
BeforeEachCallback
+  {
+    protected WindowTestCase testCase = null;
+
+    @Override
+    public void beforeEach(ExtensionContext context)
+    {
+      Method method = context.getTestMethod().get();
+      testCase = loadTestCase(method);
+    }
+
+    protected abstract WindowTestCase loadTestCase(Method method);
+  }
+
+  protected static class DrillComponentSupplier extends 
SqlTestFramework.StandardComponentSupplier
+  {
+    public DrillComponentSupplier(TempDirProducer tempFolderProducer)
+    {
+      super(tempFolderProducer);
+    }
+
+    @Override
+    public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
+        QueryRunnerFactoryConglomerate conglomerate,
+        JoinableFactoryWrapper joinableFactory,
+        Injector injector
+    )
+    {
+      final SpecificSegmentsQuerySegmentWalker retVal = 
super.createQuerySegmentWalker(
+          conglomerate,
+          joinableFactory,
+          injector);
+
+      final File tmpFolder = tempDirProducer.newTempFolder();
+      TestDataBuilder.attachIndexesForDrillTestDatasources(retVal, tmpFolder);
+      return retVal;
+    }
+  }
+
+  private class TextualResultsVerifier implements ResultsVerifier
+  {
+    protected final List<String[]> expectedResultsText;
+    @Nullable
+    protected final RowSignature expectedResultRowSignature;
+    private RowSignature currentRowSignature;
+
+    public TextualResultsVerifier(List<String[]> expectedResultsString, 
RowSignature expectedSignature)
+    {
+      this.expectedResultsText = expectedResultsString;
+      this.expectedResultRowSignature = expectedSignature;
+    }
+
+    @Override
+    public void verifyRowSignature(RowSignature rowSignature)
+    {
+      if (expectedResultRowSignature != null) {
+        Assert.assertEquals(expectedResultRowSignature, rowSignature);
+      }
+      currentRowSignature = rowSignature;
+    }
+
+    @Override
+    public void verify(String sql, QueryTestRunner.QueryResults queryResults)
+    {
+      List<Object[]> results = queryResults.results;
+      List<Object[]> expectedResults = parseResults(currentRowSignature, 
expectedResultsText);
+      try {
+        Assert.assertEquals(StringUtils.format("result count: %s", sql), 
expectedResultsText.size(), results.size());
+        if (!isOrdered(queryResults)) {
+          // in case the resultset is not ordered; order via the same 
comparator before comparison
+          results.sort(new ArrayRowCmp());
+          expectedResults.sort(new ArrayRowCmp());
+        }
+        assertResultsValid(ResultMatchMode.EQUALS_RELATIVE_1000_ULPS, 
expectedResults, queryResults);
+      }
+      catch (AssertionError e) {
+        log.info("query: %s", sql);
+        log.info(resultsToString("Expected", expectedResults));
+        log.info(resultsToString("Actual", results));
+        throw new AssertionError(StringUtils.format("%s while processing: %s", 
e.getMessage(), sql), e);
+      }
+    }
+
+    private boolean isOrdered(QueryTestRunner.QueryResults queryResults)
+    {
+      SqlNode sqlNode = queryResults.capture.getSqlNode();
+      return SqlToRelConverter.isOrdered(sqlNode);
+    }
+  }
+
+  private static class ArrayRowCmp implements Comparator<Object[]>
+  {
+    @Override
+    public int compare(Object[] arg0, Object[] arg1)
+    {
+      String s0 = Arrays.toString(arg0);
+      String s1 = Arrays.toString(arg1);
+      return s0.compareTo(s1);
+    }
+  }
+
+  private static List<Object[]> parseResults(RowSignature rs, List<String[]> 
results)

Review Comment:
   much of methods are highly specific to the `drill` testcases; why do we need 
to move these?
   
   I would rather like to see more `Extensions` instead of `TestBase` classes...



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -162,6 +207,28 @@ public QueryDefinition makeQueryDefinition(
           );
         }
 
+        log.info("Using row signature [%s] for window stage.", 
stageRowSignature);
+
+        boolean partitionOperatorExists = false;
+        List<String> currentPartitionColumns = new ArrayList<>();
+        for (OperatorFactory of : operatorList.get(i)) {
+          if (of instanceof NaivePartitioningOperatorFactory) {
+            for (String s : ((NaivePartitioningOperatorFactory) 
of).getPartitionColumns()) {
+              currentPartitionColumns.add(s);
+              partitionOperatorExists = true;
+            }
+          }
+        }
+
+        if (partitionOperatorExists) {
+          partitionColumnNames = currentPartitionColumns;
+        }
+
+        log.info(
+            "Columns which would be used to define partitioning boundaries for 
this window stage are [%s]",
+            partitionColumnNames
+        );

Review Comment:
   wouldn't it make it a bit more readable to have this inside a method?
   I don't agree with going thru all the operators and adding all's partition 
column to a list...
   
   all the code and stuff here naturally wants to have an object like:
   ```
   class WndStage {
     PartitionOperator partitionOperator;
     SortOperator sortOperator;
     List<Operator>  workOperators;
   }
   ```
   even the existance of such a class will ensure that there is no more than 1 
partitionoperator in a stage and also gives a home for methods like this



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet 
readableInputs)
         if (outputRow == null) {
           outputRow = currentRow;
           objectsOfASingleRac.add(currentRow);
-        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColsIndex)) {
+        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColumnNames)) {

Review Comment:
   looking at what `comparePartitionKeys`  is doing (produces garbage) - and 
that it gets called for-each-row...I'm not sure if this is the right approach...
   
   it would be probably better to:
   * push all rows until it hits the roof into the rac
   * use `ArrayListRowsAndColumns`'s partitioning to identify the smaller 
sections
   * submit all partitions except the last
   * move those rows into a new starter rac; restart from the begining
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to