This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 209c060191 fix distinct query in v2 engine (#9582)
209c060191 is described below
commit 209c060191812dd3f026da07e0cdd6dd639f8f5e
Author: Rong Rong <[email protected]>
AuthorDate: Mon Oct 17 15:16:15 2022 -0700
fix distinct query in v2 engine (#9582)
this is an alternative to #9570
majorly follows the same idea, but
- [x] instead of modifying the distinct table, directly return the final
result from DistinctResultBlock
- [x] distinct table can be non-main after combine operator - thus use
`getRecords` instead of `getFinalResults`
TODO
- for backward-compatibility reasons `DistinctDataTableReducer` need to
handle both the object version and the normal row version of data table. thus
the code is a bit complicated.
- we should deprecate the object version once 0.12 release is done
- we should optimize the reduce algorithm to be more efficient without
too much format conversion
Co-authored-by: Rong Rong <[email protected]>
---
.../blocks/results/DistinctResultsBlock.java | 15 +++-----
.../query/reduce/DistinctDataTableReducer.java | 42 +++++++++++++++++++---
.../query/selection/SelectionOperatorUtils.java | 26 ++++++++++----
.../blocks/results/ResultsBlockUtilsTest.java | 17 +++------
.../runtime/operator/MailboxReceiveOperator.java | 14 ++++----
.../runtime/operator/MailboxSendOperator.java | 1 +
.../query/runtime/utils/ServerRequestUtils.java | 12 ++++---
.../java/org/apache/pinot/query/QueryTestSet.java | 3 ++
.../pinot/query/runtime/QueryRunnerTest.java | 2 +-
9 files changed, 84 insertions(+), 48 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
index 8db1dfc474..e4f63966f8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
@@ -24,13 +24,11 @@ import java.util.Collection;
import java.util.List;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.data.table.Record;
import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
/**
@@ -70,13 +68,8 @@ public class DistinctResultsBlock extends BaseResultsBlock {
@Override
public DataTable getDataTable(QueryContext queryContext)
throws IOException {
- String[] columnNames = new String[]{_distinctFunction.getColumnName()};
- ColumnDataType[] columnDataTypes = new
ColumnDataType[]{ColumnDataType.OBJECT};
- DataTableBuilder dataTableBuilder =
- DataTableBuilderFactory.getDataTableBuilder(new
DataSchema(columnNames, columnDataTypes));
- dataTableBuilder.startRow();
- dataTableBuilder.setColumn(0, _distinctTable);
- dataTableBuilder.finishRow();
- return dataTableBuilder.build();
+ Collection<Object[]> rows = getRows(queryContext);
+ return SelectionOperatorUtils.getDataTableFromRows(rows,
_distinctTable.getDataSchema(),
+ queryContext.isNullHandlingEnabled());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 97c39b0e40..af25afe24e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -35,7 +35,9 @@ import org.apache.pinot.core.data.table.Record;
import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.roaringbitmap.RoaringBitmap;
/**
@@ -65,13 +67,43 @@ public class DistinctDataTableReducer implements
DataTableReducer {
// inside a DataTable
// Gather all non-empty DistinctTables
+ // TODO: until we upgrade to newer version of pinot, we have to keep both
code path. remove after 0.12.0 release.
+ // This is to work with server rolling upgrade when partially returned as
DistinctTable Obj and partially regular
+ // DataTable; if all returns are DataTable we can directly merge with
priority queue (with dedup).
List<DistinctTable> nonEmptyDistinctTables = new
ArrayList<>(dataTableMap.size());
for (DataTable dataTable : dataTableMap.values()) {
- DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
- assert customObject != null;
- DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
- if (!distinctTable.isEmpty()) {
- nonEmptyDistinctTables.add(distinctTable);
+ // Do not use the cached data schema because it might be either single
object (legacy) or normal data table
+ dataSchema = dataTable.getDataSchema();
+ int numColumns = dataSchema.size();
+ if (numColumns == 1 && dataSchema.getColumnDataType(0) ==
ColumnDataType.OBJECT) {
+ // DistinctTable is still being returned as a single object
+ DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+ assert customObject != null;
+ DistinctTable distinctTable =
ObjectSerDeUtils.deserialize(customObject);
+ if (!distinctTable.isEmpty()) {
+ nonEmptyDistinctTables.add(distinctTable);
+ }
+ } else {
+ // DistinctTable is being returned as normal data table
+ int numRows = dataTable.getNumberOfRows();
+ if (numRows > 0) {
+ List<Record> records = new ArrayList<>(numRows);
+ if (_queryContext.isNullHandlingEnabled()) {
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ for (int coldId = 0; coldId < numColumns; coldId++) {
+ nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
+ }
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ records.add(new Record(
+
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable,
rowId, nullBitmaps)));
+ }
+ } else {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ records.add(new
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)));
+ }
+ }
+ nonEmptyDistinctTables.add(new DistinctTable(dataSchema, records));
+ }
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 649e377942..14262da2d1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -419,6 +419,24 @@ public class SelectionOperatorUtils {
return row;
}
+ /**
+ * Extract a selection row from {@link DataTable} with potential null
values. (Broker side)
+ *
+ * @param dataTable data table.
+ * @param rowId row id.
+ * @return selection row.
+ */
+ public static Object[] extractRowFromDataTableWithNullHandling(DataTable
dataTable, int rowId,
+ RoaringBitmap[] nullBitmaps) {
+ Object[] row = extractRowFromDataTable(dataTable, rowId);
+ for (int colId = 0; colId < nullBitmaps.length; colId++) {
+ if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
+ row[colId] = null;
+ }
+ }
+ return row;
+ }
+
/**
* Reduces a collection of {@link DataTable}s to selection rows for
selection queries without <code>ORDER BY</code>.
* (Broker side)
@@ -436,13 +454,7 @@ public class SelectionOperatorUtils {
}
for (int rowId = 0; rowId < numRows; rowId++) {
if (rows.size() < limit) {
- Object[] row = extractRowFromDataTable(dataTable, rowId);
- for (int colId = 0; colId < numColumns; colId++) {
- if (nullBitmaps[colId] != null &&
nullBitmaps[colId].contains(rowId)) {
- row[colId] = null;
- }
- }
- rows.add(row);
+ rows.add(extractRowFromDataTableWithNullHandling(dataTable, rowId,
nullBitmaps));
} else {
break;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
index 75fb625a94..eaf9d3fbe3 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
@@ -21,14 +21,11 @@ package org.apache.pinot.core.operator.blocks.results;
import java.io.IOException;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
public class ResultsBlockUtilsTest {
@@ -74,15 +71,9 @@ public class ResultsBlockUtilsTest {
queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT
a, b FROM testTable WHERE foo = 'bar'");
dataTable =
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
dataSchema = dataTable.getDataSchema();
- assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"});
- assertEquals(dataSchema.getColumnDataTypes(), new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.OBJECT});
- assertEquals(dataTable.getNumberOfRows(), 1);
- DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
- assertNotNull(customObject);
- DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
- assertEquals(distinctTable.size(), 0);
- assertEquals(distinctTable.getDataSchema().getColumnNames(), new
String[]{"a", "b"});
- assertEquals(distinctTable.getDataSchema().getColumnDataTypes(),
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING});
+ assertEquals(dataSchema.getColumnNames(), new String[]{"a", "b"});
+ assertEquals(dataSchema.getColumnDataTypes(), new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.STRING});
+ assertEquals(dataTable.getNumberOfRows(), 0);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 8169fc9121..8f549bdaa5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -80,13 +80,13 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
}
}
- // FIXME: there's a bug where singletonInstance may be null in the case
of a JOIN where
- // one side is BROADCAST and the other is SINGLETON (this is the case
with nested loop
- // joins for inequality conditions). This causes NPEs in the logs, but
actually works
- // because the side that hits the NPE doesn't expect to get any data
anyway (that's the
- // side that gets the broadcast from one side but nothing from the
SINGLETON)
- // FIXME: https://github.com/apache/pinot/issues/9592
- _sendingStageInstances = Collections.singletonList(singletonInstance);
+ if (singletonInstance == null) {
+ // TODO: fix WorkerManager assignment, this should not happen if we
properly assign workers.
+ // see: https://github.com/apache/pinot/issues/9592
+ _sendingStageInstances = Collections.emptyList();
+ } else {
+ _sendingStageInstances = Collections.singletonList(singletonInstance);
+ }
} else {
_sendingStageInstances = sendingStageInstances;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 3e358ccc2c..db3bbe8610 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -89,6 +89,7 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
singletonInstance = serverInstance;
}
}
+ Preconditions.checkNotNull(singletonInstance, "Unable to find receiving
instance for singleton exchange");
_receivingStageInstances = Collections.singletonList(singletonInstance);
} else {
_receivingStageInstances = receivingStageInstances;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 36b445b310..2e68c601ab 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
+import
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
@@ -69,7 +70,10 @@ import
org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
public class ServerRequestUtils {
private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
- ImmutableList.of(PredicateComparisonRewriter.class.getName());
+ ImmutableList.of(
+ PredicateComparisonRewriter.class.getName(),
+ NonAggregationGroupByToDistinctQueryRewriter.class.getName()
+ );
private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
@@ -182,12 +186,12 @@ public class ServerRequestUtils {
pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList(
((ProjectNode) node).getProjects(), pinotQuery));
} else if (node instanceof AggregateNode) {
- // set agg list
-
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getSelectList(),
- ((AggregateNode) node).getAggCalls(), pinotQuery));
// set group-by list
pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
((AggregateNode) node).getGroupSet(), pinotQuery));
+ // set agg list
+
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getGroupByList(),
+ ((AggregateNode) node).getAggCalls(), pinotQuery));
} else if (node instanceof SortNode) {
if (((SortNode) node).getCollationKeys().size() > 0) {
pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(((SortNode)
node).getCollationKeys(),
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
index 6540c9c515..0456872a38 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
@@ -190,6 +190,9 @@ public class QueryTestSet {
// - distinct value done via GROUP BY with empty expr aggregation
list.
new Object[]{"SELECT a.col2, a.col3 FROM a JOIN b ON a.col1 = b.col1 "
+ " WHERE b.col3 > 0 GROUP BY a.col2, a.col3"},
+ new Object[]{"SELECT col3 FROM a GROUP BY col3, col1"},
+ new Object[]{"SELECT col1 FROM a GROUP BY col3, col1"},
+ new Object[]{"SELECT AVG(col3) FROM (SELECT col1, col3 FROM a WHERE
col3 > 1 GROUP BY col1, col3)"},
// Test optimized constant literal.
new Object[]{"SELECT col1 FROM a WHERE col3 > 0 AND col3 < -5"},
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 06fd707a26..1d03df946f 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -119,7 +119,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
} else if (l instanceof String) {
return ((String) l).compareTo((String) r);
} else {
- throw new RuntimeException("non supported type");
+ throw new RuntimeException("non supported type " + l.getClass());
}
};
Comparator<Object[]> rowComp = (l, r) -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]