This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 209c060191 fix distinct query in v2 engine (#9582)
209c060191 is described below

commit 209c060191812dd3f026da07e0cdd6dd639f8f5e
Author: Rong Rong <[email protected]>
AuthorDate: Mon Oct 17 15:16:15 2022 -0700

    fix distinct query in v2 engine (#9582)
    
    this is an alternative to #9570
    majorly follows the same idea, but
    - [x] instead of modifying the distinct table, directly return the final 
result from DistinctResultBlock
    - [x] distinct table can be non-main after combine operator - thus use 
`getRecords` instead of `getFinalResults`
    
    TODO
    - for backward-compatibility reasons `DistinctDataTableReducer` need to 
handle both the object version and the normal row version of data table. thus 
the code is a bit complicated.
      - we should deprecate the object version once 0.12 release is done
      - we should optimize the reduce algorithm to be more efficient without 
too much format conversion
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../blocks/results/DistinctResultsBlock.java       | 15 +++-----
 .../query/reduce/DistinctDataTableReducer.java     | 42 +++++++++++++++++++---
 .../query/selection/SelectionOperatorUtils.java    | 26 ++++++++++----
 .../blocks/results/ResultsBlockUtilsTest.java      | 17 +++------
 .../runtime/operator/MailboxReceiveOperator.java   | 14 ++++----
 .../runtime/operator/MailboxSendOperator.java      |  1 +
 .../query/runtime/utils/ServerRequestUtils.java    | 12 ++++---
 .../java/org/apache/pinot/query/QueryTestSet.java  |  3 ++
 .../pinot/query/runtime/QueryRunnerTest.java       |  2 +-
 9 files changed, 84 insertions(+), 48 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
index 8db1dfc474..e4f63966f8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
@@ -24,13 +24,11 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.data.table.Record;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 
 
 /**
@@ -70,13 +68,8 @@ public class DistinctResultsBlock extends BaseResultsBlock {
   @Override
   public DataTable getDataTable(QueryContext queryContext)
       throws IOException {
-    String[] columnNames = new String[]{_distinctFunction.getColumnName()};
-    ColumnDataType[] columnDataTypes = new 
ColumnDataType[]{ColumnDataType.OBJECT};
-    DataTableBuilder dataTableBuilder =
-        DataTableBuilderFactory.getDataTableBuilder(new 
DataSchema(columnNames, columnDataTypes));
-    dataTableBuilder.startRow();
-    dataTableBuilder.setColumn(0, _distinctTable);
-    dataTableBuilder.finishRow();
-    return dataTableBuilder.build();
+    Collection<Object[]> rows = getRows(queryContext);
+    return SelectionOperatorUtils.getDataTableFromRows(rows, 
_distinctTable.getDataSchema(),
+        queryContext.isNullHandlingEnabled());
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 97c39b0e40..af25afe24e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -35,7 +35,9 @@ import org.apache.pinot.core.data.table.Record;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -65,13 +67,43 @@ public class DistinctDataTableReducer implements 
DataTableReducer {
     // inside a DataTable
 
     // Gather all non-empty DistinctTables
+    // TODO: until we upgrade to newer version of pinot, we have to keep both 
code path. remove after 0.12.0 release.
+    // This is to work with server rolling upgrade when partially returned as 
DistinctTable Obj and partially regular
+    // DataTable; if all returns are DataTable we can directly merge with 
priority queue (with dedup).
     List<DistinctTable> nonEmptyDistinctTables = new 
ArrayList<>(dataTableMap.size());
     for (DataTable dataTable : dataTableMap.values()) {
-      DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
-      assert customObject != null;
-      DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
-      if (!distinctTable.isEmpty()) {
-        nonEmptyDistinctTables.add(distinctTable);
+      // Do not use the cached data schema because it might be either single 
object (legacy) or normal data table
+      dataSchema = dataTable.getDataSchema();
+      int numColumns = dataSchema.size();
+      if (numColumns == 1 && dataSchema.getColumnDataType(0) == 
ColumnDataType.OBJECT) {
+        // DistinctTable is still being returned as a single object
+        DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+        assert customObject != null;
+        DistinctTable distinctTable = 
ObjectSerDeUtils.deserialize(customObject);
+        if (!distinctTable.isEmpty()) {
+          nonEmptyDistinctTables.add(distinctTable);
+        }
+      } else {
+        // DistinctTable is being returned as normal data table
+        int numRows = dataTable.getNumberOfRows();
+        if (numRows > 0) {
+          List<Record> records = new ArrayList<>(numRows);
+          if (_queryContext.isNullHandlingEnabled()) {
+            RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+            for (int coldId = 0; coldId < numColumns; coldId++) {
+              nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
+            }
+            for (int rowId = 0; rowId < numRows; rowId++) {
+              records.add(new Record(
+                  
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable, 
rowId, nullBitmaps)));
+            }
+          } else {
+            for (int rowId = 0; rowId < numRows; rowId++) {
+              records.add(new 
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)));
+            }
+          }
+          nonEmptyDistinctTables.add(new DistinctTable(dataSchema, records));
+        }
       }
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 649e377942..14262da2d1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -419,6 +419,24 @@ public class SelectionOperatorUtils {
     return row;
   }
 
+  /**
+   * Extract a selection row from {@link DataTable} with potential null 
values. (Broker side)
+   *
+   * @param dataTable data table.
+   * @param rowId row id.
+   * @return selection row.
+   */
+  public static Object[] extractRowFromDataTableWithNullHandling(DataTable 
dataTable, int rowId,
+      RoaringBitmap[] nullBitmaps) {
+    Object[] row = extractRowFromDataTable(dataTable, rowId);
+    for (int colId = 0; colId < nullBitmaps.length; colId++) {
+      if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
+        row[colId] = null;
+      }
+    }
+    return row;
+  }
+
   /**
    * Reduces a collection of {@link DataTable}s to selection rows for 
selection queries without <code>ORDER BY</code>.
    * (Broker side)
@@ -436,13 +454,7 @@ public class SelectionOperatorUtils {
         }
         for (int rowId = 0; rowId < numRows; rowId++) {
           if (rows.size() < limit) {
-            Object[] row = extractRowFromDataTable(dataTable, rowId);
-            for (int colId = 0; colId < numColumns; colId++) {
-              if (nullBitmaps[colId] != null && 
nullBitmaps[colId].contains(rowId)) {
-                row[colId] = null;
-              }
-            }
-            rows.add(row);
+            rows.add(extractRowFromDataTableWithNullHandling(dataTable, rowId, 
nullBitmaps));
           } else {
             break;
           }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
index 75fb625a94..eaf9d3fbe3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
@@ -21,14 +21,11 @@ package org.apache.pinot.core.operator.blocks.results;
 import java.io.IOException;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 
 
 public class ResultsBlockUtilsTest {
@@ -74,15 +71,9 @@ public class ResultsBlockUtilsTest {
     queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT 
a, b FROM testTable WHERE foo = 'bar'");
     dataTable = 
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
     dataSchema = dataTable.getDataSchema();
-    assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"});
-    assertEquals(dataSchema.getColumnDataTypes(), new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.OBJECT});
-    assertEquals(dataTable.getNumberOfRows(), 1);
-    DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
-    assertNotNull(customObject);
-    DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
-    assertEquals(distinctTable.size(), 0);
-    assertEquals(distinctTable.getDataSchema().getColumnNames(), new 
String[]{"a", "b"});
-    assertEquals(distinctTable.getDataSchema().getColumnDataTypes(),
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.STRING});
+    assertEquals(dataSchema.getColumnNames(), new String[]{"a", "b"});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
+        DataSchema.ColumnDataType.STRING});
+    assertEquals(dataTable.getNumberOfRows(), 0);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 8169fc9121..8f549bdaa5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -80,13 +80,13 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
         }
       }
 
-      // FIXME: there's a bug where singletonInstance may be null in the case 
of a JOIN where
-      // one side is BROADCAST and the other is SINGLETON (this is the case 
with nested loop
-      // joins for inequality conditions). This causes NPEs in the logs, but 
actually works
-      // because the side that hits the NPE doesn't expect to get any data 
anyway (that's the
-      // side that gets the broadcast from one side but nothing from the 
SINGLETON)
-      // FIXME: https://github.com/apache/pinot/issues/9592
-      _sendingStageInstances = Collections.singletonList(singletonInstance);
+      if (singletonInstance == null) {
+        // TODO: fix WorkerManager assignment, this should not happen if we 
properly assign workers.
+        // see: https://github.com/apache/pinot/issues/9592
+        _sendingStageInstances = Collections.emptyList();
+      } else {
+        _sendingStageInstances = Collections.singletonList(singletonInstance);
+      }
     } else {
       _sendingStageInstances = sendingStageInstances;
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 3e358ccc2c..db3bbe8610 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -89,6 +89,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
           singletonInstance = serverInstance;
         }
       }
+      Preconditions.checkNotNull(singletonInstance, "Unable to find receiving 
instance for singleton exchange");
       _receivingStageInstances = Collections.singletonList(singletonInstance);
     } else {
       _receivingStageInstances = receivingStageInstances;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 36b445b310..2e68c601ab 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.FilterKind;
+import 
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
 import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
@@ -69,7 +70,10 @@ import 
org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
 public class ServerRequestUtils {
   private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
   private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
-      ImmutableList.of(PredicateComparisonRewriter.class.getName());
+      ImmutableList.of(
+          PredicateComparisonRewriter.class.getName(),
+          NonAggregationGroupByToDistinctQueryRewriter.class.getName()
+      );
   private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
       QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
   private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
@@ -182,12 +186,12 @@ public class ServerRequestUtils {
       pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList(
           ((ProjectNode) node).getProjects(), pinotQuery));
     } else if (node instanceof AggregateNode) {
-      // set agg list
-      
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getSelectList(),
-          ((AggregateNode) node).getAggCalls(), pinotQuery));
       // set group-by list
       pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
           ((AggregateNode) node).getGroupSet(), pinotQuery));
+      // set agg list
+      
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getGroupByList(),
+          ((AggregateNode) node).getAggCalls(), pinotQuery));
     } else if (node instanceof SortNode) {
       if (((SortNode) node).getCollationKeys().size() > 0) {
         
pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(((SortNode)
 node).getCollationKeys(),
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
index 6540c9c515..0456872a38 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
@@ -190,6 +190,9 @@ public class QueryTestSet {
         //   - distinct value done via GROUP BY with empty expr aggregation 
list.
         new Object[]{"SELECT a.col2, a.col3 FROM a JOIN b ON a.col1 = b.col1 "
             + " WHERE b.col3 > 0 GROUP BY a.col2, a.col3"},
+        new Object[]{"SELECT col3 FROM a GROUP BY col3, col1"},
+        new Object[]{"SELECT col1 FROM a GROUP BY col3, col1"},
+        new Object[]{"SELECT AVG(col3) FROM (SELECT col1, col3 FROM a WHERE 
col3 > 1 GROUP BY col1, col3)"},
 
         // Test optimized constant literal.
         new Object[]{"SELECT col1 FROM a WHERE col3 > 0 AND col3 < -5"},
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 06fd707a26..1d03df946f 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -119,7 +119,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
       } else if (l instanceof String) {
         return ((String) l).compareTo((String) r);
       } else {
-        throw new RuntimeException("non supported type");
+        throw new RuntimeException("non supported type " + l.getClass());
       }
     };
     Comparator<Object[]> rowComp = (l, r) -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to