This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 33fa27704f ExpressionFilterOperator IS NULL and IS NOT NULL support.
(#11249)
33fa27704f is described below
commit 33fa27704f04076998f6a34c5199207989100794
Author: Shen Yu <[email protected]>
AuthorDate: Thu Aug 3 22:23:19 2023 -0700
ExpressionFilterOperator IS NULL and IS NOT NULL support. (#11249)
---
.../ExpressionScanDocIdIterator.java | 26 ++-
.../operator/docidsets/ExpressionDocIdSet.java | 3 +-
.../operator/filter/ExpressionFilterOperator.java | 39 ++++-
.../org/apache/pinot/core/plan/FilterPlanNode.java | 3 +-
.../queries/NullHandlingEnabledQueriesTest.java | 191 +++++++++++++++++++++
5 files changed, 247 insertions(+), 15 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
index 0bec70df69..f97d0ccd57 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
+import javax.annotation.Nullable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.BitmapDocIdSetOperator;
@@ -64,9 +65,9 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
// the expression, but we only track the number of entries scanned for
the resolved expression.
private long _numEntriesScanned = 0L;
- public ExpressionScanDocIdIterator(TransformFunction transformFunction,
PredicateEvaluator predicateEvaluator,
- Map<String, DataSource> dataSourceMap, int numDocs, boolean
nullHandlingEnabled,
- PredicateEvaluationResult predicateEvaluationResult) {
+ public ExpressionScanDocIdIterator(TransformFunction transformFunction,
+ @Nullable PredicateEvaluator predicateEvaluator, Map<String, DataSource>
dataSourceMap, int numDocs,
+ boolean nullHandlingEnabled, PredicateEvaluationResult
predicateEvaluationResult) {
_transformFunction = transformFunction;
_predicateEvaluator = predicateEvaluator;
_dataSourceMap = dataSourceMap;
@@ -146,10 +147,20 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
private void processProjectionBlock(ProjectionBlock projectionBlock,
BitmapDataProvider matchingDocIds) {
int numDocs = projectionBlock.getNumDocs();
TransformResultMetadata resultMetadata =
_transformFunction.getResultMetadata();
- boolean predicateEvaluationResult = _predicateEvaluationResult ==
PredicateEvaluationResult.TRUE;
if (resultMetadata.isSingleValue()) {
_numEntriesScanned += numDocs;
RoaringBitmap nullBitmap = null;
+ if (_predicateEvaluationResult == PredicateEvaluationResult.NULL) {
+ nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+ if (nullBitmap != null) {
+ for (int i : nullBitmap) {
+ matchingDocIds.add(_docIdBuffer[i]);
+ }
+ }
+ return;
+ }
+ boolean predicateEvaluationResult = _predicateEvaluationResult ==
PredicateEvaluationResult.TRUE;
+ assert (_predicateEvaluator != null);
if (resultMetadata.hasDictionary()) {
int[] dictIds =
_transformFunction.transformToDictIdsSV(projectionBlock);
if (_nullHandlingEnabled) {
@@ -315,6 +326,11 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
}
} else {
// TODO(https://github.com/apache/pinot/issues/10882): support NULL for
multi-value.
+ if (_predicateEvaluationResult == PredicateEvaluationResult.NULL) {
+ return;
+ }
+ boolean predicateEvaluationResult = _predicateEvaluationResult ==
PredicateEvaluationResult.TRUE;
+ assert (_predicateEvaluator != null);
if (resultMetadata.hasDictionary()) {
int[][] dictIdsArray =
_transformFunction.transformToDictIdsMV(projectionBlock);
for (int i = 0; i < numDocs; i++) {
@@ -429,6 +445,6 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
}
public enum PredicateEvaluationResult {
- TRUE, FALSE
+ TRUE, NULL, FALSE
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
index 00c174d4a6..d9fdcfbbcb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.docidsets;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.core.common.BlockDocIdSet;
import
org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
@@ -29,7 +30,7 @@ import org.apache.pinot.segment.spi.datasource.DataSource;
public final class ExpressionDocIdSet implements BlockDocIdSet {
private final ExpressionScanDocIdIterator _docIdIterator;
- public ExpressionDocIdSet(TransformFunction transformFunction,
PredicateEvaluator predicateEvaluator,
+ public ExpressionDocIdSet(TransformFunction transformFunction, @Nullable
PredicateEvaluator predicateEvaluator,
Map<String, DataSource> dataSourceMap, int numDocs, boolean
nullHandlingEnabled,
ExpressionScanDocIdIterator.PredicateEvaluationResult
predicateEvaluationResult) {
_docIdIterator = new ExpressionScanDocIdIterator(transformFunction,
predicateEvaluator, dataSourceMap, numDocs,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
index 4338dd8dee..7c0fe3b829 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
@@ -32,6 +32,7 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.ColumnContext;
import
org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator;
import org.apache.pinot.core.operator.docidsets.ExpressionDocIdSet;
+import org.apache.pinot.core.operator.docidsets.NotDocIdSet;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
@@ -47,6 +48,7 @@ public class ExpressionFilterOperator extends
BaseFilterOperator {
private final QueryContext _queryContext;
private final Map<String, DataSource> _dataSourceMap;
private final TransformFunction _transformFunction;
+ private final Predicate.Type _predicateType;
private final PredicateEvaluator _predicateEvaluator;
public ExpressionFilterOperator(IndexSegment segment, QueryContext
queryContext, Predicate predicate, int numDocs) {
@@ -65,21 +67,44 @@ public class ExpressionFilterOperator extends
BaseFilterOperator {
columnContextMap.put(column, ColumnContext.fromDataSource(dataSource));
});
_transformFunction = TransformFunctionFactory.get(lhs, columnContextMap,
_queryContext);
- _predicateEvaluator =
- PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
_transformFunction.getDictionary(),
- _transformFunction.getResultMetadata().getDataType());
+ _predicateType = predicate.getType();
+ if (_predicateType == Predicate.Type.IS_NULL || _predicateType ==
Predicate.Type.IS_NOT_NULL) {
+ _predicateEvaluator = null;
+ } else {
+ _predicateEvaluator =
+ PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
_transformFunction.getDictionary(),
+ _transformFunction.getResultMetadata().getDataType());
+ }
}
@Override
protected BlockDocIdSet getTrues() {
- return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator,
_dataSourceMap, _numDocs,
- _queryContext.isNullHandlingEnabled(),
ExpressionScanDocIdIterator.PredicateEvaluationResult.TRUE);
+ if (_predicateType == Predicate.Type.IS_NULL) {
+ return getNulls();
+ } else if (_predicateType == Predicate.Type.IS_NOT_NULL) {
+ return new NotDocIdSet(getNulls(), _numDocs);
+ } else {
+ return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator,
_dataSourceMap, _numDocs,
+ _queryContext.isNullHandlingEnabled(),
ExpressionScanDocIdIterator.PredicateEvaluationResult.TRUE);
+ }
+ }
+
+ @Override
+ protected BlockDocIdSet getNulls() {
+ return new ExpressionDocIdSet(_transformFunction, null, _dataSourceMap,
_numDocs,
+ _queryContext.isNullHandlingEnabled(),
ExpressionScanDocIdIterator.PredicateEvaluationResult.NULL);
}
@Override
protected BlockDocIdSet getFalses() {
- return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator,
_dataSourceMap, _numDocs,
- _queryContext.isNullHandlingEnabled(),
ExpressionScanDocIdIterator.PredicateEvaluationResult.FALSE);
+ if (_predicateType == Predicate.Type.IS_NULL) {
+ return new NotDocIdSet(getNulls(), _numDocs);
+ } else if (_predicateType == Predicate.Type.IS_NOT_NULL) {
+ return getNulls();
+ } else {
+ return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator,
_dataSourceMap, _numDocs,
+ _queryContext.isNullHandlingEnabled(),
ExpressionScanDocIdIterator.PredicateEvaluationResult.FALSE);
+ }
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index af30f18e8f..6eafcbf170 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -241,8 +241,7 @@ public class FilterPlanNode implements PlanNode {
} else if (canApplyH3IndexForInclusionCheck(predicate,
lhs.getFunction())) {
return new H3InclusionIndexFilterOperator(_indexSegment,
_queryContext, predicate, numDocs);
} else {
- // TODO: ExpressionFilterOperator does not support predicate types
without PredicateEvaluator (IS_NULL,
- // IS_NOT_NULL, TEXT_MATCH)
+ // TODO: ExpressionFilterOperator does not support predicate types
without PredicateEvaluator (TEXT_MATCH)
return new ExpressionFilterOperator(_indexSegment, _queryContext,
predicate, numDocs);
}
} else {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
index 10bad7a232..ef9386a955 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
@@ -897,4 +897,195 @@ public class NullHandlingEnabledQueriesTest extends
BaseQueriesTest {
List<Object[]> rows = resultTable.getRows();
assertEquals(rows.size(), 0);
}
+
+ @Test
+ public void testExpressionFilterOperatoIsNullPredicate()
+ throws Exception {
+ initializeRows();
+ insertRowWithTwoColumns(null, 1);
+ insertRowWithTwoColumns(1, 2);
+ insertRowWithTwoColumns(-1, 3);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT)
+ .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query =
+ String.format("SELECT %s, %s FROM testTable WHERE ADD(%s, 0) IS NULL
LIMIT 100", COLUMN1, COLUMN2, COLUMN1);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{null, 1});
+ }
+
+ @Test
+ public void testExpressionFilterOperatorIsNotNullPredicate()
+ throws Exception {
+ initializeRows();
+ insertRowWithTwoColumns(null, 1);
+ insertRowWithTwoColumns(null, 2);
+ insertRowWithTwoColumns(1, 3);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT)
+ .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query =
+ String.format("SELECT %s, %s FROM testTable WHERE ADD(%s, 0) IS NOT
NULL LIMIT 100", COLUMN1, COLUMN2, COLUMN1);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{1, 3});
+ }
+
+ @Test
+ public void
testExpressionFilterOperatorIsNullPredicateInsideNotFilterOperator()
+ throws Exception {
+ initializeRows();
+ insertRowWithTwoColumns(null, 1);
+ insertRowWithTwoColumns(null, 2);
+ insertRowWithTwoColumns(1, 3);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT)
+ .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query =
+ String.format("SELECT %s, %s FROM testTable WHERE NOT(ADD(%s, 0) IS
NULL) LIMIT 100", COLUMN1, COLUMN2,
+ COLUMN1);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{1, 3});
+ }
+
+ @Test
+ public void
testExpressionFilterOperatorIsNotNullPredicateInsideNotFilterOperator()
+ throws Exception {
+ initializeRows();
+ insertRowWithTwoColumns(null, 1);
+ insertRowWithTwoColumns(2, 3);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT)
+ .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query =
+ String.format("SELECT %s, %s FROM testTable WHERE NOT(ADD(%s, 0) IS
NOT NULL) LIMIT 100", COLUMN1, COLUMN2,
+ COLUMN1);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{null, 1});
+ }
+
+ @Test
+ public void testExpressionFilterOperatorApplyIsNullPredicateToNotOfColumn()
+ throws Exception {
+ initializeRows();
+ insertRowWithTwoColumns(true, 1);
+ insertRowWithTwoColumns(null, 2);
+ insertRowWithTwoColumns(false, 3);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1,
FieldSpec.DataType.BOOLEAN)
+ .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query =
+ String.format("SELECT %s, %s FROM testTable WHERE (NOT %s) IS NULL
LIMIT 100", COLUMN1, COLUMN2, COLUMN1);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{null, 2});
+ }
+
+ @Test
+ public void testExpressionFilterOperatorApplyAndForGetNulls()
+ throws Exception {
+ initializeRows();
+ insertRowWithTwoColumns(Integer.MIN_VALUE, null);
+ insertRowWithTwoColumns(1, null);
+ insertRowWithTwoColumns(-1, 1);
+ insertRowWithTwoColumns(null, null);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT)
+ .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query =
+ String.format("SELECT %s, %s FROM testTable WHERE (add(%s, 0) IS NULL)
AND (%s IS NULL)", COLUMN1, COLUMN2,
+ COLUMN1, COLUMN2);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{null, null});
+ }
+
+ @Test
+ public void testExpressionFilterOperatorOnMultiValue()
+ throws Exception {
+ initializeRows();
+ insertRowWithTwoColumns(new Integer[]{1, 2, 3}, 1);
+ insertRowWithTwoColumns(new Integer[]{2, 3, 4}, null);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new Schema.SchemaBuilder().addMultiValueDimension(COLUMN1,
FieldSpec.DataType.INT)
+ .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query =
+ String.format("SELECT * FROM testTable WHERE (VALUEIN(%s, 2, 3) IN (2,
3)) AND (%s = 1)", COLUMN1, COLUMN2);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{new Integer[]{1, 2, 3}, 1});
+ }
+
+ @Test
+ public void testExpressionFilterOperatorMultiValueIsNull()
+ throws Exception {
+ initializeRows();
+ insertRow(new Integer[]{1, 2, 3});
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new Schema.SchemaBuilder().addMultiValueDimension(COLUMN1,
FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query = String.format("SELECT * FROM testTable WHERE (VALUEIN(%s,
2, 3) IS NULL)", COLUMN1);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 0);
+ }
+
+ @Test
+ public void testExpressionFilterOperatorMultiValueIsNotNull()
+ throws Exception {
+ initializeRows();
+ insertRow(new Integer[]{1, 2, 3});
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ Schema schema = new Schema.SchemaBuilder().addMultiValueDimension(COLUMN1,
FieldSpec.DataType.INT).build();
+ setUpSegments(tableConfig, schema);
+ String query = String.format("SELECT * FROM testTable WHERE (VALUEIN(%s,
2, 3) IS NOT NULL)", COLUMN1);
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query,
QUERY_OPTIONS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+ assertArrayEquals(rows.get(0), new Object[]{new Integer[]{1, 2, 3}});
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]