This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 622e3c95adca5cf30a0aff6542556feab9b8a861 Author: Fucun Chu <chufu...@hotmail.com> AuthorDate: Fri Mar 12 16:11:13 2021 +0800 IMPALA-10580: Implement ds_theta_union_f() function This function receives two strings that are serialized Apache DataSketches Theta sketches. Union two sketches and returns the resulting sketch of union. Example: select ds_theta_estimate(ds_theta_union_f(sketch1, sketch2)) from sketch_tbl; +-------------------------------------------------------+ | ds_theta_estimate(ds_theta_union_f(sketch1, sketch2)) | +-------------------------------------------------------+ | 15 | +-------------------------------------------------------+ Change-Id: I8329979b81ceeaad739a43fab79768ca9c2916fa Reviewed-on: http://gerrit.cloudera.org:8080/17179 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exprs/datasketches-functions-ir.cc | 31 +++++++++ be/src/exprs/datasketches-functions.h | 7 ++ common/function-registry/impala_functions.py | 2 + .../queries/QueryTest/datasketches-theta.test | 74 ++++++++++++++++++++++ 4 files changed, 114 insertions(+) diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc index bf0eb1a..3a3fa4a 100644 --- a/be/src/exprs/datasketches-functions-ir.cc +++ b/be/src/exprs/datasketches-functions-ir.cc @@ -21,6 +21,7 @@ #include "gutil/strings/substitute.h" #include "thirdparty/datasketches/hll.hpp" #include "thirdparty/datasketches/theta_sketch.hpp" +#include "thirdparty/datasketches/theta_union.hpp" #include "thirdparty/datasketches/theta_a_not_b.hpp" #include "thirdparty/datasketches/kll_sketch.hpp" #include "udf/udf-internal.h" @@ -159,6 +160,36 @@ StringVal DataSketchesFunctions::DsThetaExclude(FunctionContext* ctx, return StringVal::null(); } +bool update_sketch_to_theta_union(FunctionContext* ctx, + const StringVal& serialized_sketch, datasketches::theta_union& union_sketch) { + if (!serialized_sketch.is_null && serialized_sketch.len > 0) { + datasketches::theta_sketch::unique_ptr sketch_ptr; + if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) { + LogSketchDeserializationError(ctx); + return false; + } + union_sketch.update(*sketch_ptr); + } + return true; +} + +StringVal DataSketchesFunctions::DsThetaUnionF(FunctionContext* ctx, + const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) { + datasketches::theta_union union_sketch = datasketches::theta_union::builder().build(); + // Update two sketches to theta_union + if (!update_sketch_to_theta_union(ctx, first_serialized_sketch, union_sketch)) { + return StringVal::null(); + } + if (!update_sketch_to_theta_union(ctx, second_serialized_sketch, union_sketch)) { + return StringVal::null(); + } + // Result + datasketches::compact_theta_sketch sketch = union_sketch.get_result(); + std::stringstream serialized_input; + sketch.serialize(serialized_input); + return StringStreamToStringVal(ctx, serialized_input); +} + FloatVal DataSketchesFunctions::DsKllQuantile(FunctionContext* ctx, const StringVal& serialized_sketch, const DoubleVal& rank) { if (serialized_sketch.is_null || serialized_sketch.len == 0) return FloatVal::null(); diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h index 26f276c..15d9774 100644 --- a/be/src/exprs/datasketches-functions.h +++ b/be/src/exprs/datasketches-functions.h @@ -78,6 +78,13 @@ public: const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch); + /// 'first_serialized_sketch' and 'second_serialized_sketch' are both expected as + /// serialized Apache DataSketches Theta sketches. If they are not, then the query + /// fails. Union two sketches and returns the resulting sketch of union. + static StringVal DsThetaUnionF(FunctionContext* ctx, + const StringVal& first_serialized_sketch, + const StringVal& second_serialized_sketch); + /// 'serialized_sketch' is expected as a serialized Apache DataSketches KLL sketch. If /// it is not, then the query fails. 'rank' is used to identify which item (estimate) /// to return from the sketched dataset. E.g. 0.1 means the item where 10% of the diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py index ee9f87f..e46ef89 100644 --- a/common/function-registry/impala_functions.py +++ b/common/function-registry/impala_functions.py @@ -1007,6 +1007,8 @@ visible_functions = [ '_ZN6impala21DataSketchesFunctions15DsThetaEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'], [['ds_theta_exclude'], 'STRING', ['STRING', 'STRING'], '_ZN6impala21DataSketchesFunctions14DsThetaExcludeEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'], + [['ds_theta_union_f'], 'STRING', ['STRING', 'STRING'], + '_ZN6impala21DataSketchesFunctions13DsThetaUnionFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'], [['ds_kll_quantile'], 'FLOAT', ['STRING', 'DOUBLE'], '_ZN6impala21DataSketchesFunctions13DsKllQuantileEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_9DoubleValE'], [['ds_kll_n'], 'BIGINT', ['STRING'], diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test index 34a3a0e..27a0c06 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test +++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test @@ -407,4 +407,78 @@ select ds_theta_estimate(ds_theta_exclude(sketch1, sketch2)) from sketch_interme BIGINT ---- RESULTS 5 +==== +---- QUERY +# Checks that unioning a valid sketch with a null value result the valid sketch being +# returned. +select + ds_theta_estimate(ds_theta_union_f(date_sketch, null)), + ds_theta_estimate(ds_theta_union_f(null, float_sketch)) +from sketch_store; +---- TYPES +BIGINT,BIGINT +---- RESULTS +3,10 +3,10 +3,10 +3,10 +==== +---- QUERY +# Check that ds_theta_union_f() returns an empty sketch for an empty sketch. +select ds_theta_estimate(ds_theta_union_f(ds_theta_sketch(cast(f2 as float)), null)) +from functional_parquet.emptytable; +---- TYPES +BIGINT +---- RESULTS +0 +==== +---- QUERY +# Checks that ds_theta_union_f() returns an empty sketch for NULL inputs. +select ds_theta_estimate(ds_theta_union_f(null_str, some_nulls)) from +functional_parquet.nullrows where id='b'; +---- TYPES +BIGINT +---- RESULTS +0 +==== +---- QUERY +# ds_theta_union_f() returns an error if it receives an invalid serialized sketch. +select ds_theta_union_f(null, date_string_col) from functional_parquet.alltypestiny +where id=1; +---- CATCH +UDF ERROR: Unable to deserialize sketch. +==== +---- QUERY +# ds_theta_union_f() returns an error if it receives an invalid serialized sketch. +select ds_theta_union_f(date_string_col, null) from functional_parquet.alltypestiny +where id=1; +---- CATCH +UDF ERROR: Unable to deserialize sketch. +==== +---- QUERY +# Unions the sketches from theta_sketches_impala_hive2 and checks if the union produces +# the same result as if these sketches were used separately to get the estimates. +select + ds_theta_estimate(ds_theta_union_f(i_ti, h_ti)) as ti, + ds_theta_estimate(ds_theta_union_f(i_i, h_i)) as i, + ds_theta_estimate(ds_theta_union_f(i_bi, h_bi)) as bi, + ds_theta_estimate(ds_theta_union_f(i_f, h_f)) as f, + ds_theta_estimate(ds_theta_union_f(i_d, h_d)) as d, + ds_theta_estimate(ds_theta_union_f(i_s, h_s)) as s, + ds_theta_estimate(ds_theta_union_f(i_c, h_c)) as c, + ds_theta_estimate(ds_theta_union_f(i_v, h_v)) as v, + ds_theta_estimate(ds_theta_union_f(i_nc, h_nc)) as nc +from theta_sketches_impala_hive2; +---- TYPES +BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT +---- RESULTS +5,7,6,6,7,4,4,3,0 +==== +---- QUERY +# Union two sketches from different columns of sketch_intermediate. +select ds_theta_estimate(ds_theta_union_f(sketch1, sketch2)) from sketch_intermediate; +---- TYPES +BIGINT +---- RESULTS +15 ==== \ No newline at end of file