This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d062cfd Add InIdSetTransformFunction (#5973)
d062cfd is described below
commit d062cfde5a7093a59d19240a1a678dea500279bf
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Sep 8 18:15:05 2020 -0700
Add InIdSetTransformFunction (#5973)
Add `InIdSetTransformFunction` to support filtering with an `IdSet`.
Example query:
`SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID,
'AgAAAAABAAAAADowAAABAAAAAAADABAAAAAAAOpMg0+zUg==') = 1`
(`AgAAAAABAAAAADowAAABAAAAAAADABAAAAAAAOpMg0+zUg==` is the base64 encoded
IdSet)
---
.../common/function/TransformFunctionType.java | 1 +
.../function/InIdSetTransformFunction.java | 132 +++++++++++++++++++++
.../function/TransformFunctionFactory.java | 1 +
.../org/apache/pinot/queries/IdSetQueriesTest.java | 45 +++++++
.../tests/BaseClusterIntegrationTestSet.java | 22 +++-
5 files changed, 198 insertions(+), 3 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index 76f6446..1347f1d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -55,6 +55,7 @@ public enum TransformFunctionType {
ARRAYLENGTH("arrayLength"),
VALUEIN("valueIn"),
MAPVALUE("mapValue"),
+ INIDSET("inIdSet"),
GROOVY("groovy"),
// Special type for annotation based scalar functions
SCALAR("scalar"),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/InIdSetTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/InIdSetTransformFunction.java
new file mode 100644
index 0000000..bff7f36
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/InIdSetTransformFunction.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.core.query.utils.idset.IdSet;
+import org.apache.pinot.core.query.utils.idset.IdSets;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * The IN_ID_SET transform function takes 2 arguments:
+ * <ul>
+ * <li>Expression: a single-value expression</li>
+ * <li>Base64 encoded IdSet: a literal string</li>
+ * </ul>
+ * <p>For each docId, the function returns {@code 1} if the IdSet contains the
value of the expression, {code 0} if not.
+ * <p>E.g. {@code SELECT COUNT(*) FROM myTable WHERE IN_ID_SET(col, '<base64
encoded IdSet>') = 1)}
+ */
+public class InIdSetTransformFunction extends BaseTransformFunction {
+ private TransformFunction _transformFunction;
+ private IdSet _idSet;
+ private int[] _results;
+
+ @Override
+ public String getName() {
+ return TransformFunctionType.INIDSET.getName();
+ }
+
+ @Override
+ public void init(List<TransformFunction> arguments, Map<String, DataSource>
dataSourceMap) {
+ Preconditions.checkArgument(arguments.size() == 2,
+ "2 arguments are required for IN_ID_SET transform function:
expression, base64 encoded IdSet");
+
Preconditions.checkArgument(arguments.get(0).getResultMetadata().isSingleValue(),
+ "First argument for IN_ID_SET transform function must be a
single-value expression");
+ Preconditions.checkArgument(arguments.get(1) instanceof
LiteralTransformFunction,
+ "Second argument for IN_ID_SET transform function must be a literal
string of the base64 encoded IdSet");
+
+ _transformFunction = arguments.get(0);
+ try {
+ _idSet = IdSets.fromBase64String(((LiteralTransformFunction)
arguments.get(1)).getLiteral());
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Caught exception while deserializing
IdSet", e);
+ }
+ }
+
+ @Override
+ public TransformResultMetadata getResultMetadata() {
+ return INT_SV_NO_DICTIONARY_METADATA;
+ }
+
+ @Override
+ public Dictionary getDictionary() {
+ return null;
+ }
+
+ @Override
+ public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+ if (_results == null) {
+ _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ }
+
+ int length = projectionBlock.getNumDocs();
+ DataType dataType = _transformFunction.getResultMetadata().getDataType();
+ switch (dataType) {
+ case INT:
+ int[] intValues =
_transformFunction.transformToIntValuesSV(projectionBlock);
+ for (int i = 0; i < length; i++) {
+ _results[i] = _idSet.contains(intValues[i]) ? 1 : 0;
+ }
+ break;
+ case LONG:
+ long[] longValues =
_transformFunction.transformToLongValuesSV(projectionBlock);
+ for (int i = 0; i < length; i++) {
+ _results[i] = _idSet.contains(longValues[i]) ? 1 : 0;
+ }
+ break;
+ case FLOAT:
+ float[] floatValues =
_transformFunction.transformToFloatValuesSV(projectionBlock);
+ for (int i = 0; i < length; i++) {
+ _results[i] = _idSet.contains(floatValues[i]) ? 1 : 0;
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues =
_transformFunction.transformToDoubleValuesSV(projectionBlock);
+ for (int i = 0; i < length; i++) {
+ _results[i] = _idSet.contains(doubleValues[i]) ? 1 : 0;
+ }
+ break;
+ case STRING:
+ String[] stringValues =
_transformFunction.transformToStringValuesSV(projectionBlock);
+ for (int i = 0; i < length; i++) {
+ _results[i] = _idSet.contains(stringValues[i]) ? 1 : 0;
+ }
+ break;
+ case BYTES:
+ byte[][] bytesValues =
_transformFunction.transformToBytesValuesSV(projectionBlock);
+ for (int i = 0; i < length; i++) {
+ _results[i] = _idSet.contains(bytesValues[i]) ? 1 : 0;
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ return _results;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index 2e89188..e70e5ed 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -90,6 +90,7 @@ public class TransformFunctionFactory {
put(TransformFunctionType.ARRAYLENGTH.getName().toLowerCase(),
ArrayLengthTransformFunction.class);
put(TransformFunctionType.VALUEIN.getName().toLowerCase(),
ValueInTransformFunction.class);
put(TransformFunctionType.MAPVALUE.getName().toLowerCase(),
MapValueTransformFunction.class);
+ put(TransformFunctionType.INIDSET.getName().toLowerCase(),
InIdSetTransformFunction.class);
put(TransformFunctionType.GROOVY.getName().toLowerCase(),
GroovyTransformFunction.class);
put(TransformFunctionType.CASE.getName().toLowerCase(),
CaseTransformFunction.class);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/IdSetQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/IdSetQueriesTest.java
index 480b208..0ee4f0e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/IdSetQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/IdSetQueriesTest.java
@@ -42,6 +42,7 @@ import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.utils.idset.BloomFilterIdSet;
import org.apache.pinot.core.query.utils.idset.EmptyIdSet;
+import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.core.query.utils.idset.Roaring64NavigableMapIdSet;
import org.apache.pinot.core.query.utils.idset.RoaringBitmapIdSet;
@@ -392,6 +393,50 @@ public class IdSetQueriesTest extends BaseQueriesTest {
}
}
+ @Test
+ public void testInIdSet()
+ throws IOException {
+ // Create an IdSet with the values from the first half records
+ IdSet idSet = IdSets.create(DataType.INT);
+ for (int i = 0; i < NUM_RECORDS / 2; i++) {
+ idSet.add(_values[i]);
+ }
+ String serializedIdSet = idSet.toBase64String();
+
+ // Calculate the expected number of matching records
+ int expectedNumMatchingRecords = 0;
+ for (int value : _values) {
+ if (idSet.contains(value)) {
+ expectedNumMatchingRecords++;
+ }
+ }
+
+ {
+ String query = "SELECT COUNT(*) FROM testTable where INIDSET(intColumn,
'" + serializedIdSet + "') = 1";
+ AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
+ IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+ QueriesTestUtils
+
.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
expectedNumMatchingRecords,
+ NUM_RECORDS, 0, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
+ assertEquals((long) aggregationResult.get(0),
expectedNumMatchingRecords);
+ }
+
+ {
+ String query = "SELECT COUNT(*) FROM testTable where INIDSET(intColumn,
'" + serializedIdSet + "') = 0";
+ AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
+ IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
+ NUM_RECORDS - expectedNumMatchingRecords, NUM_RECORDS, 0,
NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
+ assertEquals((long) aggregationResult.get(0), NUM_RECORDS -
expectedNumMatchingRecords);
+ }
+ }
+
@AfterClass
public void tearDown()
throws IOException {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index cd18431..a524047 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -33,6 +33,8 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.query.utils.idset.IdSet;
+import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -52,7 +54,6 @@ import static org.testng.Assert.assertTrue;
* Shared set of common tests for cluster integration tests.
* <p>To enable the test, override it and add @Test annotation.
*/
-@SuppressWarnings("unused")
public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrationTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseClusterIntegrationTestSet.class);
private static final Random RANDOM = new Random();
@@ -186,11 +187,11 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
testSqlQuery(query, Collections.singletonList(query));
query =
"SELECT DistanceGroup FROM mytable WHERE \"Month\" BETWEEN 1 AND 1 AND
DivAirportSeqIDs IN (1078102, 1142303, 1530402, 1172102, 1291503) OR
SecurityDelay IN (1, 0, 14, -9999) LIMIT 10";
- h2queries = Arrays.asList(
+ h2queries = Collections.singletonList(
"SELECT DistanceGroup FROM mytable WHERE Month BETWEEN 1 AND 1 AND
(DivAirportSeqIDs__MV0 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR
DivAirportSeqIDs__MV1 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR
DivAirportSeqIDs__MV2 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR
DivAirportSeqIDs__MV3 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR
DivAirportSeqIDs__MV4 IN (1078102, 1142303, 1530402, 1172102, 1291503)) OR
SecurityDelay IN (1, 0, 14, -9999) LIMIT 10000");
testSqlQuery(query, h2queries);
query = "SELECT MAX(Quarter), MAX(FlightNum) FROM mytable LIMIT 8";
- h2queries = Arrays.asList("SELECT MAX(Quarter),MAX(FlightNum) FROM mytable
LIMIT 10000");
+ h2queries = Collections.singletonList("SELECT MAX(Quarter),MAX(FlightNum)
FROM mytable LIMIT 10000");
testSqlQuery(query, h2queries);
query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch = 16312 AND
Carrier = 'DL'";
testSqlQuery(query, Collections.singletonList(query));
@@ -251,6 +252,21 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
query =
"SELECT DaysSinceEpoch, MAX(ArrDelay) - MAX(AirTime) AS Diff FROM
mytable GROUP BY DaysSinceEpoch HAVING (Diff >= 300 AND Diff < 500) OR Diff <
-500 ORDER BY Diff DESC";
testSqlQuery(query, Collections.singletonList(query));
+
+ // IN_ID_SET
+ IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
+ idSet.add(19690L);
+ idSet.add(20355L);
+ idSet.add(21171L);
+ // Also include a non-existing id
+ idSet.add(0L);
+ String serializedIdSet = idSet.toBase64String();
+ String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE
INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
+ String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690,
20355, 21171, 0)";
+ testSqlQuery(inIdSetQuery, Collections.singletonList(inQuery));
+ String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE
INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
+ String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN
(19690, 20355, 21171, 0)";
+ testSqlQuery(notInIdSetQuery, Collections.singletonList(notInQuery));
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]