This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 659fe7c68a Support distinctCountBitmap and *TupleSketch functions in
v2 (#11245)
659fe7c68a is described below
commit 659fe7c68a62d553e5ba24dfe996cc18f5a3bcea
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Aug 1 23:25:34 2023 -0700
Support distinctCountBitmap and *TupleSketch functions in v2 (#11245)
---
.../tests/MultiStageEngineIntegrationTest.java | 45 +++++++++++--
.../tests/SumPrecisionIntegrationTest.java | 14 ----
...onTest.java => TupleSketchIntegrationTest.java} | 78 ++++++++--------------
.../pinot/segment/spi/AggregationFunctionType.java | 34 +++++++---
4 files changed, 92 insertions(+), 79 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index bd12f0901b..e4d149a547 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -128,6 +128,43 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
testQueryWithMatchingRowCount(pinotQuery, h2Query);
}
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testDistinctCountQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String[] numericResultFunctions = new String[]{
+ "distinctCount", "distinctCountBitmap", "distinctCountHLL",
"segmentPartitionedDistinctCount",
+ "distinctCountSmartHLL", "distinctCountThetaSketch", "distinctSum",
"distinctAvg"
+ };
+
+ double[] expectedNumericResults = new double[]{
+ 364, 364, 355, 364, 364, 364, 5915969, 16252.662087912087
+ };
+ Assert.assertEquals(numericResultFunctions.length,
expectedNumericResults.length);
+
+ for (int i = 0; i < numericResultFunctions.length; i++) {
+ String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM
mytable", numericResultFunctions[i]);
+ JsonNode jsonNode = postQuery(pinotQuery);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
expectedNumericResults[i]);
+ }
+
+ String[] binaryResultFunctions = new String[]{
+ "distinctCountRawHLL", "distinctCountRawThetaSketch"
+ };
+ int[] expectedBinarySizeResults = new int[]{
+ 360,
+ 3904
+ };
+ for (int i = 0; i < binaryResultFunctions.length; i++) {
+ String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM
mytable", binaryResultFunctions[i]);
+ JsonNode jsonNode = postQuery(pinotQuery);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText().length(),
+ expectedBinarySizeResults[i]);
+ }
+
+ setUseMultiStageQueryEngine(true);
+ }
+
@Test(dataProvider = "useBothQueryEngines")
public void testMultiValueColumnAggregationQuery(boolean
useMultiStageQueryEngine)
throws Exception {
@@ -166,13 +203,13 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
pinotQuery = "SELECT percentileKLLMV(DivAirportIDs, 99) FROM mytable";
jsonNode = postQuery(pinotQuery);
-
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
> 12000);
-
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
< 15000);
+
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
> 10000);
+
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
< 17000);
pinotQuery = "SELECT percentileKLLMV(DivAirportIDs, 99, 100) FROM mytable";
jsonNode = postQuery(pinotQuery);
-
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
> 12000);
-
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
< 15000);
+
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
> 10000);
+
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
< 17000);
setUseMultiStageQueryEngine(true);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
index 8b8a8ab41f..f3763dd3e3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
@@ -24,8 +24,6 @@ import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
@@ -104,18 +102,6 @@ public class SumPrecisionIntegrationTest extends
BaseClusterIntegrationTest {
}
}
- private void runAndAssert(String query, Map<String, Integer>
expectedGroupToValueMap)
- throws Exception {
- Map<String, Integer> actualGroupToValueMap = new HashMap<>();
- JsonNode jsonNode = postQuery(query);
- jsonNode.get("resultTable").get("rows").forEach(node -> {
- String group = node.get(0).textValue();
- int value = node.get(1).intValue();
- actualGroupToValueMap.put(group, value);
- });
- assertEquals(actualGroupToValueMap, expectedGroupToValueMap);
- }
-
private File createAvroFile(long totalNumRecords)
throws IOException {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java
similarity index 57%
copy from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
copy to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java
index 8b8a8ab41f..e0e05d1075 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java
@@ -22,22 +22,21 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Random;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
@@ -45,14 +44,13 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
-public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest {
- private static final String DIM_NAME = "dimName";
- private static final String MET_BIG_DECIMAL_BYTES = "metBigDecimalBytes";
- private static final String MET_BIG_DECIMAL_STRING = "metBigDecimalString";
- private static final String MET_DOUBLE = "metDouble";
- private static final String MET_LONG = "metLong";
+public class TupleSketchIntegrationTest extends BaseClusterIntegrationTest {
+ private static final String MET_TUPLE_SKETCH_BYTES = "metTupleSketchBytes";
+
+ private static final Random RANDOM = new Random();
@BeforeClass
public void setup()
@@ -67,11 +65,8 @@ public class SumPrecisionIntegrationTest extends
BaseClusterIntegrationTest {
// create & upload schema AND table config
Schema schema = new
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
- .addSingleValueDimension(DIM_NAME, FieldSpec.DataType.STRING)
- .addMetric(MET_BIG_DECIMAL_BYTES, FieldSpec.DataType.BIG_DECIMAL)
- .addMetric(MET_BIG_DECIMAL_STRING, FieldSpec.DataType.BIG_DECIMAL)
- .addMetric(MET_DOUBLE, FieldSpec.DataType.DOUBLE)
- .addMetric(MET_LONG, FieldSpec.DataType.LONG).build();
+ .addMetric(MET_TUPLE_SKETCH_BYTES, FieldSpec.DataType.BYTES)
+ .build();
addSchema(schema);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).build();
addTableConfig(tableConfig);
@@ -94,26 +89,16 @@ public class SumPrecisionIntegrationTest extends
BaseClusterIntegrationTest {
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String query =
- String.format("SELECT SUMPRECISION(%s), SUMPRECISION(%s), sum(%s),
sum(%s) FROM %s",
- MET_BIG_DECIMAL_BYTES, MET_BIG_DECIMAL_STRING, MET_DOUBLE,
MET_LONG, DEFAULT_TABLE_NAME);
- double sumResult = 2147484147500L;
- JsonNode jsonNode = postQuery(query);
- System.out.println("jsonNode = " + jsonNode.toPrettyString());
- for (int i = 0; i < 4; i++) {
-
assertEquals(Double.parseDouble(jsonNode.get("resultTable").get("rows").get(0).get(i).asText()),
sumResult);
- }
- }
-
- private void runAndAssert(String query, Map<String, Integer>
expectedGroupToValueMap)
- throws Exception {
- Map<String, Integer> actualGroupToValueMap = new HashMap<>();
+ String.format(
+ "SELECT DISTINCT_COUNT_TUPLE_SKETCH(%s),
DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(%s), "
+ + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(%s),
AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(%s) FROM %s",
+ MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+ DEFAULT_TABLE_NAME);
JsonNode jsonNode = postQuery(query);
- jsonNode.get("resultTable").get("rows").forEach(node -> {
- String group = node.get(0).textValue();
- int value = node.get(1).intValue();
- actualGroupToValueMap.put(group, value);
- });
- assertEquals(actualGroupToValueMap, expectedGroupToValueMap);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong()
> 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asText().length(),
1756);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(2).asLong()
> 0);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong()
> 0);
}
private File createAvroFile(long totalNumRecords)
@@ -122,29 +107,16 @@ public class SumPrecisionIntegrationTest extends
BaseClusterIntegrationTest {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
avroSchema.setFields(ImmutableList.of(
- new Field(DIM_NAME, org.apache.avro.Schema.create(Type.STRING), null,
null),
- new Field(MET_BIG_DECIMAL_BYTES,
org.apache.avro.Schema.create(Type.BYTES), null, null),
- new Field(MET_BIG_DECIMAL_STRING,
org.apache.avro.Schema.create(Type.STRING), null, null),
- new Field(MET_DOUBLE, org.apache.avro.Schema.create(Type.DOUBLE),
null, null),
- new Field(MET_LONG, org.apache.avro.Schema.create(Type.LONG), null,
null)));
+ new Field(MET_TUPLE_SKETCH_BYTES,
org.apache.avro.Schema.create(Type.BYTES), null, null)));
// create avro file
File avroFile = new File(_tempDir, "data.avro");
try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
fileWriter.create(avroSchema, avroFile);
- int dimCardinality = 50;
- BigDecimal bigDecimalBase = BigDecimal.valueOf(Integer.MAX_VALUE + 1L);
for (int i = 0; i < totalNumRecords; i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(DIM_NAME, "dim" + (RandomUtils.nextInt() % dimCardinality));
- BigDecimal bigDecimalValue = bigDecimalBase.add(BigDecimal.valueOf(i));
-
- record.put(MET_BIG_DECIMAL_BYTES,
ByteBuffer.wrap(BigDecimalUtils.serialize(bigDecimalValue)));
- record.put(MET_BIG_DECIMAL_STRING, bigDecimalValue.toPlainString());
- record.put(MET_DOUBLE, bigDecimalValue.doubleValue());
- record.put(MET_LONG, bigDecimalValue.longValue());
-
+ record.put(MET_TUPLE_SKETCH_BYTES,
ByteBuffer.wrap(getRandomRawValue()));
// add avro record to file
fileWriter.append(record);
}
@@ -152,6 +124,12 @@ public class SumPrecisionIntegrationTest extends
BaseClusterIntegrationTest {
return avroFile;
}
+ private byte[] getRandomRawValue() {
+ IntegerSketch is = new IntegerSketch(4, IntegerSummary.Mode.Sum);
+ is.update(RANDOM.nextInt(100), RANDOM.nextInt(100));
+ return
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
+ }
+
@AfterClass
public void tearDown()
throws IOException {
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 77a9199727..fae66d1ded 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -80,14 +80,15 @@ public enum AggregationFunctionType {
* (1) distinct_count only supports single argument;
* (2) count(distinct ...) support multi-argument and will be converted into
DISTINCT + COUNT
*/
- DISTINCTCOUNT("distinctCount", null, SqlKind.OTHER_FUNCTION,
+ DISTINCTCOUNT("distinctCount", ImmutableList.of("DISTINCT_COUNT"),
SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY,
ReturnTypes.BIGINT,
ReturnTypes.explicit(SqlTypeName.OTHER)),
- // TODO: support bitmap and segment partition in V2
- DISTINCTCOUNTBITMAP("distinctCountBitmap"),
- SEGMENTPARTITIONEDDISTINCTCOUNT("segmentPartitionedDistinctCount", null,
SqlKind.OTHER_FUNCTION,
+ DISTINCTCOUNTBITMAP("distinctCountBitmap",
ImmutableList.of("DISTINCT_COUNT_BITMAP"), SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY,
ReturnTypes.BIGINT,
- ReturnTypes.BIGINT),
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
+ SEGMENTPARTITIONEDDISTINCTCOUNT("segmentPartitionedDistinctCount",
+ ImmutableList.of("SEGMENT_PARTITIONED_DISTINCT_COUNT"),
SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY,
ReturnTypes.BIGINT, ReturnTypes.BIGINT),
DISTINCTCOUNTHLL("distinctCountHLL", ImmutableList.of("DISTINCT_COUNT_HLL"),
SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.NUMERIC), ordinal -> ordinal > 0),
@@ -101,6 +102,7 @@ public enum AggregationFunctionType {
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
// DEPRECATED in v2
+ @Deprecated
FASTHLL("fastHLL"),
DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", null,
SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
@@ -131,6 +133,7 @@ public enum AggregationFunctionType {
OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC,
SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000,
ReturnTypes.explicit(SqlTypeName.OTHER)),
// DEPRECATED in v2
+ @Deprecated
PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
PERCENTILEKLL("percentileKLL", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC,
SqlTypeFamily.NUMERIC)), ReturnTypes.ARG0,
@@ -140,6 +143,7 @@ public enum AggregationFunctionType {
ReturnTypes.explicit(SqlTypeName.OTHER)),
// DEPRECATED in v2
+ @Deprecated
IDSET("idSet"),
// TODO: support histogram requires solving ARRAY constructor and
multi-function signature without optional ordinal
@@ -167,17 +171,25 @@ public enum AggregationFunctionType {
OperandTypes.NUMERIC, ReturnTypes.DOUBLE,
ReturnTypes.explicit(SqlTypeName.OTHER)),
FOURTHMOMENT("fourthMoment"),
- // TODO: revisit support for Tuple sketches in V2
// DataSketches Tuple Sketch support
- DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch"),
+ DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch",
ImmutableList.of("DISTINCT_COUNT_TUPLE_SKETCH"),
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.BINARY, ReturnTypes.BIGINT,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
// DataSketches Tuple Sketch support for Integer based Tuple Sketches
-
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch"),
+
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch",
+ ImmutableList.of("DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH"),
SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY,
ReturnTypes.VARCHAR_2000,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
- SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch"),
- AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch"),
+ SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch",
+ ImmutableList.of("SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH"),
SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY,
ReturnTypes.BIGINT,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
+ AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch",
ImmutableList.of("AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH"),
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.BINARY, ReturnTypes.BIGINT,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
- // TODO: revisit support for Geo-spatial agg in V2
// Geo aggregation functions
STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY),
ReturnTypes.explicit(SqlTypeName.OTHER)),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]