This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d211d89985 Adding tuple sketch scalar functions (#11517)
d211d89985 is described below
commit d211d899855881704931d8c83cfbbc152bb8f16f
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Sep 10 04:24:26 2023 -0700
Adding tuple sketch scalar functions (#11517)
Adding more sketch integration test
---
.../core/function/scalar/SketchFunctions.java | 92 ++++++++++
.../integration/tests/custom/ThetaSketchTest.java | 67 +++++++
.../integration/tests/custom/TupleSketchTest.java | 198 +++++++++++++++++++++
3 files changed, 357 insertions(+)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 37244eff70..8b0b72d1d1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -32,6 +32,7 @@ import org.apache.datasketches.theta.Union;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.annotations.ScalarFunction;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -277,4 +278,95 @@ public class SketchFunctions {
+ sketchObj.getClass());
}
}
+
+ @ScalarFunction(names = {"intSumTupleSketchUnion",
"int_sum_tuple_sketch_union"})
+ public static byte[] intSumTupleSketchUnion(Object o1, Object o2) {
+ return intSumTupleSketchUnion((int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK), o1, o2);
+ }
+
+ @ScalarFunction(names = {"intSumTupleSketchUnion",
"int_sum_tuple_sketch_union"})
+ public static byte[] intSumTupleSketchUnion(int nomEntries, Object o1,
Object o2) {
+ return intTupleSketchUnionVar(IntegerSummary.Mode.Sum, nomEntries, o1, o2);
+ }
+
+ @ScalarFunction(names = {"intMinTupleSketchUnion",
"int_min_tuple_sketch_union"})
+ public static byte[] intMinTupleSketchUnion(Object o1, Object o2) {
+ return intMinTupleSketchUnion((int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK), o1, o2);
+ }
+
+ @ScalarFunction(names = {"intMinTupleSketchUnion",
"int_min_tuple_sketch_union"})
+ public static byte[] intMinTupleSketchUnion(int nomEntries, Object o1,
Object o2) {
+ return intTupleSketchUnionVar(IntegerSummary.Mode.Min, nomEntries, o1, o2);
+ }
+
+ @ScalarFunction(names = {"intMaxTupleSketchUnion",
"int_max_tuple_sketch_union"})
+ public static byte[] intMaxTupleSketchUnion(Object o1, Object o2) {
+ return intMaxTupleSketchUnion((int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK), o1, o2);
+ }
+
+ @ScalarFunction(names = {"intMaxTupleSketchUnion",
"int_max_tuple_sketch_union"})
+ public static byte[] intMaxTupleSketchUnion(int nomEntries, Object o1,
Object o2) {
+ return intTupleSketchUnionVar(IntegerSummary.Mode.Max, nomEntries, o1, o2);
+ }
+
+ private static byte[] intTupleSketchUnionVar(IntegerSummary.Mode mode, int
nomEntries, Object... sketchObjects) {
+ org.apache.datasketches.tuple.Union<IntegerSummary>
+ union = new org.apache.datasketches.tuple.Union<>(nomEntries,
+ new IntegerSummarySetOperations(mode, mode));
+ for (Object sketchObj : sketchObjects) {
+ union.union(asIntegerSketch(sketchObj));
+ }
+ return
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(union.getResult().compact());
+ }
+
+ @ScalarFunction(names = {"intSumTupleSketchIntersect",
"int_sum_tuple_sketch_intersect"})
+ public static byte[] intSumTupleSketchIntersect(Object o1, Object o2) {
+ return intTupleSketchIntersectVar(IntegerSummary.Mode.Sum, o1, o2);
+ }
+
+ @ScalarFunction(names = {"intMinTupleSketchIntersect",
"int_min_tuple_sketch_intersect"})
+ public static byte[] intMinTupleSketchIntersect(Object o1, Object o2) {
+ return intTupleSketchIntersectVar(IntegerSummary.Mode.Min, o1, o2);
+ }
+
+ @ScalarFunction(names = {"intMaxTupleSketchIntersect",
"int_max_tuple_sketch_intersect"})
+ public static byte[] intMaxTupleSketchIntersect(Object o1, Object o2) {
+ return intTupleSketchIntersectVar(IntegerSummary.Mode.Max, o1, o2);
+ }
+
+ private static byte[] intTupleSketchIntersectVar(IntegerSummary.Mode mode,
Object... sketchObjects) {
+ org.apache.datasketches.tuple.Intersection<IntegerSummary> intersection =
+ new org.apache.datasketches.tuple.Intersection<>(new
IntegerSummarySetOperations(mode, mode));
+ for (Object sketchObj : sketchObjects) {
+ intersection.intersect(asIntegerSketch(sketchObj));
+ }
+ return
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(intersection.getResult().compact());
+ }
+
+ @ScalarFunction(names = {"intTupleSketchDiff", "int_tuple_sketch_diff"})
+ public static byte[] intSumTupleSketchDiff(Object o1, Object o2) {
+ org.apache.datasketches.tuple.AnotB<IntegerSummary> diff = new
org.apache.datasketches.tuple.AnotB<>();
+ diff.setA(asIntegerSketch(o1));
+ diff.notB(asIntegerSketch(o2));
+ return
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(diff.getResult(false).compact());
+ }
+
+ private static org.apache.datasketches.tuple.Sketch<IntegerSummary>
asIntegerSketch(Object sketchObj) {
+ if (sketchObj instanceof String) {
+ byte[] decoded = Base64.getDecoder().decode((String) sketchObj);
+ return
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(decoded);
+ } else if (sketchObj instanceof org.apache.datasketches.tuple.Sketch) {
+ return (org.apache.datasketches.tuple.Sketch<IntegerSummary>) sketchObj;
+ } else if (sketchObj instanceof byte[]) {
+ return
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) sketchObj);
+ } else {
+ throw new RuntimeException("Exception occurred getting reading Tuple
Sketch, unsupported Object type: "
+ + sketchObj.getClass());
+ }
+ }
+
+ @ScalarFunction(names = {"getIntTupleSketchEstimate",
"get_int_tuple_sketch_estimate"})
+ public static long getIntTupleSketchEstimate(Object o1) {
+ return Math.round(asIntegerSketch(o1).getEstimate());
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
index c6a96bca70..392ed8738c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
@@ -450,6 +450,73 @@ public class ThetaSketchTest extends
CustomDataQueryClusterIntegrationTest {
ImmutableMap.of("Female", 50 + 60 + 70 + 110 + 120 + 130, "Male", 80
+ 90 + 100 + 140 + 150 + 160);
runAndAssert(query, expected);
}
+
+ // union all by gender
+ {
+ String query = "select dimValue,
distinctCountThetaSketch(thetaSketchCol) from "
+ + "( "
+ + "SELECT dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Female' "
+ + "UNION ALL "
+ + "SELECT dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Male' "
+ + ") "
+ + "GROUP BY dimValue";
+ ImmutableMap<String, Integer> expected =
+ ImmutableMap.of("Female", 50 + 60 + 70 + 110 + 120 + 130, "Male", 80
+ 90 + 100 + 140 + 150 + 160);
+ runAndAssert(query, expected);
+ }
+
+ // JOIN all by gender
+ {
+ String query = "select a.dimValue,
distinctCountThetaSketch(b.thetaSketchCol) "
+ + "FROM "
+ + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Female') a "
+ + "JOIN "
+ + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Male') b "
+ + "ON a.dimName = b.dimName "
+ + "GROUP BY a.dimValue";
+ ImmutableMap<String, Integer> expected =
+ ImmutableMap.of("Female", 80 + 90 + 100 + 140 + 150 + 160);
+ runAndAssert(query, expected);
+ }
+ {
+ String query = "select b.dimValue,
distinctCountThetaSketch(a.thetaSketchCol) "
+ + "FROM "
+ + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Female') a "
+ + "JOIN "
+ + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Male') b "
+ + "ON a.dimName = b.dimName "
+ + "GROUP BY b.dimValue";
+ ImmutableMap<String, Integer> expected =
+ ImmutableMap.of("Male", 50 + 60 + 70 + 110 + 120 + 130);
+ runAndAssert(query, expected);
+ }
+ {
+ String query = "SELECT "
+ + "GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT("
+ + " DISTINCT_COUNT_RAW_THETA_SKETCH(a.thetaSketchCol, ''), "
+ + " DISTINCT_COUNT_RAW_THETA_SKETCH(b.thetaSketchCol, ''))), "
+ + "GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION("
+ + " DISTINCT_COUNT_RAW_THETA_SKETCH(a.thetaSketchCol, ''), "
+ + " DISTINCT_COUNT_RAW_THETA_SKETCH(b.thetaSketchCol, ''))) "
+ + "FROM "
+ + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Female') a "
+ + "JOIN "
+ + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+ + " where dimName = 'gender' and dimValue = 'Male') b "
+ + "ON a.dimName = b.dimName";
+ JsonNode jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).longValue(),
+ 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).longValue(),
+ 50 + 60 + 70 + 110 + 120 + 130 + 80 + 90 + 100 + 140 + 150 + 160);
+ }
}
private void runAndAssert(String query, int expected)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
index e90b244f11..bb9c175e25 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
@@ -26,9 +26,12 @@ import java.util.Base64;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.datasketches.tuple.Intersection;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.aninteger.IntegerSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -42,6 +45,7 @@ import static org.testng.Assert.assertTrue;
public class TupleSketchTest extends CustomDataQueryClusterIntegrationTest {
private static final String DEFAULT_TABLE_NAME = "TupleSketchTest";
+ private static final String ID = "id";
private static final String MET_TUPLE_SKETCH_BYTES = "metTupleSketchBytes";
@Override
@@ -71,6 +75,196 @@ public class TupleSketchTest extends
CustomDataQueryClusterIntegrationTest {
assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong()
> 0);
}
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testTupleUnionQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ for (int i = 0; i < 10; i++) {
+ String query = "SELECT "
+ + "DISTINCT_COUNT_TUPLE_SKETCH(metTupleSketchBytes), "
+ +
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes))
"
+ + "FROM " + getTableName()
+ + " WHERE id=" + i;
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(),
distinctCount);
+ query = "SELECT "
+ +
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes)
"
+ + "FILTER (WHERE id = " + i + ")) "
+ + "FROM " + getTableName();
+ jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
distinctCount);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ // Query Type 1:
+ String query = "SELECT
DISTINCT_COUNT_TUPLE_SKETCH(metTupleSketchBytes), "
+ +
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes))
"
+ + "FROM " + getTableName()
+ + " WHERE id=" + i + " OR id=" + j;
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(),
distinctCount);
+
+ // Query Type 2:
+ query = "SELECT "
+ +
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes)
"
+ + "FILTER (WHERE id = " + i + " OR id = " + j + ")) "
+ + "FROM " + getTableName();
+ jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
distinctCount);
+
+ // Query Type 3:
+ query = "SELECT "
+ + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_UNION( "
+ +
"DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) FILTER (WHERE
id = " + i + "),"
+ +
"DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) FILTER (WHERE
id = " + j + ")))"
+ + " FROM " + getTableName();
+
+ jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
distinctCount);
+ }
+ }
+ }
+
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testTupleIntersectionQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ for (int i = 1; i < 9; i++) { // Query with Intersection
+ String query = "SELECT "
+ + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_INTERSECT( "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+ + "FILTER (WHERE id = " + (i - 1) + " OR id = " + i + "),"
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+ + "FILTER (WHERE id = " + i + " OR id = " + (i + 1) + "))),"
+
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+ + "FILTER (WHERE id = " + (i - 1) + " OR id = " + i + "),"
+
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+ + "FILTER (WHERE id = " + (i + 1) + " OR id = " + i + ")"
+ + " FROM " + getTableName();
+ JsonNode jsonNode = postQuery(query);
+
+ String sketch1 =
jsonNode.get("resultTable").get("rows").get(0).get(1).asText();
+ String sketch2 =
jsonNode.get("resultTable").get("rows").get(0).get(2).asText();
+ Sketch<IntegerSummary> deserializedSketch1 =
+
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(Base64.getDecoder().decode(sketch1));
+ Sketch<IntegerSummary> deserializedSketch2 =
+
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(Base64.getDecoder().decode(sketch2));
+ Intersection<IntegerSummary> intersection = new Intersection<>(new
IntegerSummarySetOperations(
+ IntegerSummary.Mode.Sum, IntegerSummary.Mode.Sum));
+ intersection.intersect(deserializedSketch1);
+ intersection.intersect(deserializedSketch2);
+ long estimate = (long) intersection.getResult().getEstimate();
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
estimate);
+ }
+ }
+
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testUnionWithSketchQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query =
+ String.format(
+ "SELECT "
+ + "DISTINCT_COUNT_TUPLE_SKETCH(%s), "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(%s), "
+ + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(%s), "
+ + "AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(%s) "
+ + "FROM "
+ + "("
+ + "SELECT %s FROM %s WHERE %s = 4 "
+ + "UNION ALL "
+ + "SELECT %s FROM %s WHERE %s = 5 "
+ + "UNION ALL "
+ + "SELECT %s FROM %s WHERE %s = 6 "
+ + "UNION ALL "
+ + "SELECT %s FROM %s WHERE %s = 7 "
+ + ")",
+ MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+ MET_TUPLE_SKETCH_BYTES, getTableName(), ID,
MET_TUPLE_SKETCH_BYTES, getTableName(), ID,
+ MET_TUPLE_SKETCH_BYTES, getTableName(), ID,
MET_TUPLE_SKETCH_BYTES, getTableName(), ID);
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+ byte[] rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+ Sketch<IntegerSummary> deserializedSketch =
+
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(rawSketchBytes);
+
+ assertTrue(distinctCount > 0);
+ assertEquals(Double.valueOf(deserializedSketch.getEstimate()).longValue(),
distinctCount);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(2).asLong()
> 0);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong()
> 0);
+ }
+
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testJoinWithSketchQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query =
+ String.format(
+ "SELECT "
+ + "DISTINCT_COUNT_TUPLE_SKETCH(a.%s), "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(a.%s), "
+ + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(a.%s), "
+ + "AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(a.%s), "
+ + "DISTINCT_COUNT_TUPLE_SKETCH(b.%s), "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(b.%s), "
+ + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(b.%s), "
+ + "AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(b.%s) "
+ + "FROM "
+ + "(SELECT * FROM %s WHERE %s < 8 ) a "
+ + "JOIN "
+ + "(SELECT * FROM %s WHERE %s > 3 ) b "
+ + "ON a.%s = b.%s",
+ MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+ MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+ getTableName(), ID, getTableName(), ID, ID, ID);
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+ byte[] rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+ Sketch<IntegerSummary> deserializedSketch =
+
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(rawSketchBytes);
+ assertTrue(distinctCount > 0);
+ assertEquals(Double.valueOf(deserializedSketch.getEstimate()).longValue(),
distinctCount);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(2).asLong()
> 0);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong()
> 0);
+
+ distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(4).asLong();
+ rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(5).asText());
+ deserializedSketch =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(rawSketchBytes);
+ assertTrue(distinctCount > 0);
+ assertEquals(Double.valueOf(deserializedSketch.getEstimate()).longValue(),
distinctCount);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(6).asLong()
> 0);
+ assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(7).asLong()
> 0);
+ }
+
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testJoinAndIntersectionWithSketchQueries(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query =
+ "SELECT "
+ + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_INTERSECT( "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(a." +
MET_TUPLE_SKETCH_BYTES + "), "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(b." +
MET_TUPLE_SKETCH_BYTES + "))), "
+ + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_UNION( "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(a." +
MET_TUPLE_SKETCH_BYTES + "), "
+ + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(b." +
MET_TUPLE_SKETCH_BYTES + "))) "
+ + "FROM "
+ + "(SELECT * FROM " + getTableName() + " WHERE id < 8 ) a "
+ + "JOIN "
+ + "(SELECT * FROM " + getTableName() + " WHERE id > 3 ) b "
+ + "ON a.id = b.id";
+ JsonNode jsonNode = postQuery(query);
+ long distinctCountIntersection =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+ long distinctCountUnion =
jsonNode.get("resultTable").get("rows").get(0).get(1).asLong();
+ assertTrue(distinctCountIntersection <= distinctCountUnion);
+ }
+
@Override
public String getTableName() {
return DEFAULT_TABLE_NAME;
@@ -79,6 +273,7 @@ public class TupleSketchTest extends
CustomDataQueryClusterIntegrationTest {
@Override
public Schema createSchema() {
return new Schema.SchemaBuilder().setSchemaName(getTableName())
+ .addSingleValueDimension(ID, FieldSpec.DataType.INT)
.addMetric(MET_TUPLE_SKETCH_BYTES, FieldSpec.DataType.BYTES)
.build();
}
@@ -89,6 +284,8 @@ public class TupleSketchTest extends
CustomDataQueryClusterIntegrationTest {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
avroSchema.setFields(ImmutableList.of(
+ new org.apache.avro.Schema.Field(ID,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null,
+ null),
new org.apache.avro.Schema.Field(MET_TUPLE_SKETCH_BYTES,
org.apache.avro.Schema.create(
org.apache.avro.Schema.Type.BYTES), null, null)));
@@ -99,6 +296,7 @@ public class TupleSketchTest extends
CustomDataQueryClusterIntegrationTest {
for (int i = 0; i < getCountStarResult(); i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(ID, RandomUtils.nextInt(10));
record.put(MET_TUPLE_SKETCH_BYTES,
ByteBuffer.wrap(getRandomRawValue()));
// add avro record to file
fileWriter.append(record);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]