This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch hll_sketch in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git
commit 16374bc881673804367f5b5a6ded19b8d759278d Author: AlexanderSaydakov <[email protected]> AuthorDate: Thu Jun 27 11:34:42 2019 -0700 hll sketch --- META.json | 15 +- Makefile | 7 +- README.md | 48 +++++- datasketches.control | 2 +- sql/datasketches_hll_sketch.sql | 147 ++++++++++++++++++ src/hll_sketch_c_adapter.cpp | 158 +++++++++++++++++++ src/hll_sketch_c_adapter.h | 52 +++++++ src/hll_sketch_pg_functions.c | 330 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 747 insertions(+), 12 deletions(-) diff --git a/META.json b/META.json index 98a2747..328109f 100644 --- a/META.json +++ b/META.json @@ -1,7 +1,7 @@ { "name": "datasketches", "abstract": "approximate algorithms for big data analysis", - "version": "1.2.0", + "version": "1.3.0", "maintainer": [ "Alexander Saydakov <[email protected]>", "Sketches User List <[email protected]>" @@ -11,22 +11,27 @@ "cpc_sketch": { "abstract": "CPC sketch for approximate distinct counting", "file": "sql/datasketches_cpc_sketch.sql", - "version": "1.2.0" + "version": "1.3.0" }, "theta_sketch": { "abstract": "Theta sketch for approximate distinct counting with set operations", "file": "sql/datasketches_theta_sketch.sql", - "version": "1.2.0" + "version": "1.3.0" + }, + "hll_sketch": { + "abstract": "HLL sketch for approximate distinct counting", + "file": "sql/datasketches_hll_sketch.sql", + "version": "1.3.0" }, "kll_float_sketch": { "abstract": "KLL quantiles sketch for approximating distributions of float values (quanitles, ranks, histograms)", "file": "sql/datasketches_kll_float_sketch.sql", - "version": "1.2.0" + "version": "1.3.0" }, "frequent_strings_sketch": { "abstract": "frequent items sketch for approximate computation of the most frequent strings", "file": "sql/datasketches_frequent_strings_sketch.sql", - "version": "1.2.0" + "version": "1.3.0" } }, "resources": { diff --git a/Makefile b/Makefile index 6cc3a54..8b25629 100644 --- a/Makefile +++ b/Makefile @@ -5,17 +5,18 @@ OBJS = src/base64.o src/common.o \ src/kll_float_sketch_pg_functions.o src/kll_float_sketch_c_adapter.o \ src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o \ src/theta_sketch_pg_functions.o src/theta_sketch_c_adapter.o \ - src/frequent_strings_sketch_pg_functions.o src/frequent_strings_sketch_c_adapter.o + src/frequent_strings_sketch_pg_functions.o src/frequent_strings_sketch_c_adapter.o \ + src/hll_sketch_pg_functions.o src/hll_sketch_c_adapter.o # assume a copy or link datasketches-cpp in the current dir CORE = datasketches-cpp CPC = $(CORE)/cpc/src OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o -DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql sql/datasketches_theta_sketch.sql sql/datasketches_frequent_strings_sketch.sql +DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql sql/datasketches_theta_sketch.sql sql/datasketches_frequent_strings_sketch.sql sql/datasketches_hll_sketch.sql CXX = g++-8 -PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include +PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include -I$(CORE)/hll/include SHLIB_LINK = -lstdc++ -L/usr/local/lib PG_CONFIG = pg_config diff --git a/README.md b/README.md index 3b6c817..9ada639 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,9 @@ See [Datasketches documentation](https://datasketches.github.io/) for details. This module currently supports the following sketches: -- CPC (Compressed Probabilistic Counting) sketch - very compact (when serialized) distinct-counting sketch +- CPC (Compressed Probabilistic Counting) sketch - very compact (smaller than HLL when serialized) distinct-counting sketch - Theta sketch - distinct counting with set operations (intersection, a-not-b) +- HLL sketch - very compact distinct-counting sketch based on HyperLogLog algorithm - KLL float quantiles sketch - for estimating distributions: quantile, rank, PMF (histogram), CDF - Frequent strings sketch - capture the heaviest items (strings) by count or by some other weight @@ -36,7 +37,7 @@ Approximate count distinct: Note that the above one-off distinct count is just to show the basic usage. Most importantly, the sketch can be used as an "additive" distinct count metric in a data cube. -Merging sketches: +Aggregate union: create table cpc_sketch_test(sketch cpc_sketch); insert into cpc_sketch_test select cpc_sketch_build(1); @@ -47,6 +48,13 @@ Merging sketches: ------------------------- 3.00024414612919 +Non-aggregate union: + + select cpc_sketch_get_estimate(cpc_sketch_union(cpc_sketch_build(1), cpc_sketch_build(2))); + cpc_sketch_get_estimate + ------------------------- + 2.00016277723359 + <h2>Distinct counting with Theta sketch</h2> See above for the exact distinct count of 100 million random integers @@ -101,9 +109,43 @@ Non-aggregate set operations: 1 (2 rows) +<h2>Distinct counting with HLL sketch</h2> + +See above for the exact distinct count of 100 million random integers + +Approximate distinct count: + + $ time psql test -c "select hll_sketch_distinct(id) from random_ints_100m" + hll_sketch_distinct + --------------------- + 63826337.5738399 + (1 row) + + real 0m19.075s + +Note that the above one-off distinct count is just to show the basic usage. Most importantly, the sketch can be used as an "additive" distinct count metric in a data cube. + +Aggregate union: + + create table hll_sketch_test(sketch hll_sketch); + insert into hll_sketch_test select hll_sketch_build(1); + insert into hll_sketch_test select hll_sketch_build(2); + insert into hll_sketch_test select hll_sketch_build(3); + select hll_sketch_get_estimate(hll_sketch_union(sketch)) from hll_sketch_test; + hll_sketch_get_estimate + ------------------------- + 3.00000001490116 + +Non-aggregate union: + + select hll_sketch_get_estimate(hll_sketch_union(hll_sketch_build(1), hll_sketch_build(2))); + hll_sketch_get_estimate + ------------------------- + 2.00000000496705 + <h2>Estimating quanitles, ranks and histograms with KLL sketch</h2> -Table "normal" has 1 million values from the normal distribution with mean=0 and stddev=1. +Table "normal" has 1 million values from the normal (Gaussian) distribution with mean=0 and stddev=1. We can build a sketch, which represents the distribution (create table kll\_float\_sketch\_test(sketch kll\_float\_sketch)): $ psql test -c "insert into kll_float_sketch_test select kll_float_sketch_build(value) from normal" diff --git a/datasketches.control b/datasketches.control index 6fdc30f..7433fb9 100644 --- a/datasketches.control +++ b/datasketches.control @@ -1,6 +1,6 @@ # Datasketches module comment = 'Aggregation functions and data types for approximate algorithms.' -default_version = '1.2.0' +default_version = '1.3.0' relocatable = true module_pathname = '$libdir/datasketches' diff --git a/sql/datasketches_hll_sketch.sql b/sql/datasketches_hll_sketch.sql new file mode 100644 index 0000000..e19f972 --- /dev/null +++ b/sql/datasketches_hll_sketch.sql @@ -0,0 +1,147 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TYPE hll_sketch; + +CREATE OR REPLACE FUNCTION hll_sketch_in(cstring) RETURNS hll_sketch + AS '$libdir/datasketches', 'pg_sketch_in' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_out(hll_sketch) RETURNS cstring + AS '$libdir/datasketches', 'pg_sketch_out' + LANGUAGE C STRICT IMMUTABLE; + +CREATE TYPE hll_sketch ( + INPUT = hll_sketch_in, + OUTPUT = hll_sketch_out, + STORAGE = EXTERNAL +); + +CREATE CAST (bytea as hll_sketch) WITHOUT FUNCTION AS ASSIGNMENT; +CREATE CAST (hll_sketch as bytea) WITHOUT FUNCTION AS ASSIGNMENT; + +CREATE OR REPLACE FUNCTION hll_sketch_add_item(internal, anyelement) RETURNS internal + AS '$libdir/datasketches', 'pg_hll_sketch_add_item' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_add_item(internal, anyelement, int) RETURNS internal + AS '$libdir/datasketches', 'pg_hll_sketch_add_item' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_add_item(internal, anyelement, int, int) RETURNS internal + AS '$libdir/datasketches', 'pg_hll_sketch_add_item' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_get_estimate(hll_sketch) RETURNS double precision + AS '$libdir/datasketches', 'pg_hll_sketch_get_estimate' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_get_estimate_and_bounds(hll_sketch) RETURNS double precision[] + AS '$libdir/datasketches', 'pg_hll_sketch_get_estimate_and_bounds' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_get_estimate_and_bounds(hll_sketch, int) RETURNS double precision[] + AS '$libdir/datasketches', 'pg_hll_sketch_get_estimate_and_bounds' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_from_internal(internal) RETURNS hll_sketch + AS '$libdir/datasketches', 'pg_hll_sketch_from_internal' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_get_estimate_from_internal(internal) RETURNS double precision + AS '$libdir/datasketches', 'pg_hll_sketch_get_estimate_from_internal' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_to_string(hll_sketch) RETURNS TEXT + AS '$libdir/datasketches', 'pg_hll_sketch_to_string' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_union_agg(internal, hll_sketch) RETURNS internal + AS '$libdir/datasketches', 'pg_hll_sketch_union_agg' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_union_agg(internal, hll_sketch, int) RETURNS internal + AS '$libdir/datasketches', 'pg_hll_sketch_union_agg' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_union_agg(internal, hll_sketch, int, int) RETURNS internal + AS '$libdir/datasketches', 'pg_hll_sketch_union_agg' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_union_get_result(internal) RETURNS hll_sketch + AS '$libdir/datasketches', 'pg_hll_union_get_result' + LANGUAGE C STRICT IMMUTABLE; + +CREATE AGGREGATE hll_sketch_distinct(anyelement) ( + sfunc = hll_sketch_add_item, + stype = internal, + finalfunc = hll_sketch_get_estimate_from_internal +); + +CREATE AGGREGATE hll_sketch_distinct(anyelement, int) ( + sfunc = hll_sketch_add_item, + stype = internal, + finalfunc = hll_sketch_get_estimate_from_internal +); + +CREATE AGGREGATE hll_sketch_build(anyelement) ( + sfunc = hll_sketch_add_item, + stype = internal, + finalfunc = hll_sketch_from_internal +); + +CREATE AGGREGATE hll_sketch_build(anyelement, int) ( + sfunc = hll_sketch_add_item, + stype = internal, + finalfunc = hll_sketch_from_internal +); + +CREATE AGGREGATE hll_sketch_build(anyelement, int, int) ( + sfunc = hll_sketch_add_item, + stype = internal, + finalfunc = hll_sketch_from_internal +); + +CREATE AGGREGATE hll_sketch_union(hll_sketch) ( + sfunc = hll_sketch_union_agg, + stype = internal, + finalfunc = hll_union_get_result +); + +CREATE AGGREGATE hll_sketch_union(hll_sketch, int) ( + sfunc = hll_sketch_union_agg, + stype = internal, + finalfunc = hll_union_get_result +); + +CREATE AGGREGATE hll_sketch_union(hll_sketch, int, int) ( + sfunc = hll_sketch_union_agg, + stype = internal, + finalfunc = hll_union_get_result +); + +CREATE OR REPLACE FUNCTION hll_sketch_union(hll_sketch, hll_sketch) RETURNS hll_sketch + AS '$libdir/datasketches', 'pg_hll_sketch_union' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_union(hll_sketch, hll_sketch, int) RETURNS hll_sketch + AS '$libdir/datasketches', 'pg_hll_sketch_union' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION hll_sketch_union(hll_sketch, hll_sketch, int, int) RETURNS hll_sketch + AS '$libdir/datasketches', 'pg_hll_sketch_union' + LANGUAGE C IMMUTABLE; diff --git a/src/hll_sketch_c_adapter.cpp b/src/hll_sketch_c_adapter.cpp new file mode 100644 index 0000000..53320f9 --- /dev/null +++ b/src/hll_sketch_c_adapter.cpp @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "hll_sketch_c_adapter.h" +#include "allocator.h" + +#include <sstream> + +#include <hll.hpp> + +typedef datasketches::HllSketch<palloc_allocator<char>> hll_sketch; +typedef datasketches::HllUnion<palloc_allocator<char>> hll_union; + +void* hll_sketch_new(unsigned lg_k) { + try { + return new (palloc(sizeof(hll_sketch))) hll_sketch(lg_k); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* hll_sketch_new_tgt_type(unsigned lg_k, unsigned tgt_type) { + try { + return new (palloc(sizeof(hll_sketch))) hll_sketch( + lg_k, + tgt_type == 4 ? datasketches::TgtHllType::HLL_4 : tgt_type == 6 ? datasketches::TgtHllType::HLL_6 : datasketches::TgtHllType::HLL_8 + ); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void hll_sketch_delete(void* sketchptr) { + try { + static_cast<hll_sketch*>(sketchptr)->~hll_sketch(); + pfree(sketchptr); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void hll_sketch_update(void* sketchptr, const void* data, unsigned length) { + try { + static_cast<hll_sketch*>(sketchptr)->update(data, length); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +double hll_sketch_get_estimate(const void* sketchptr) { + try { + return static_cast<const hll_sketch*>(sketchptr)->getEstimate(); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +Datum* hll_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs) { + try { + Datum* est_and_bounds = (Datum*) palloc(sizeof(Datum) * 3); + est_and_bounds[0] = Float8GetDatum(static_cast<const hll_sketch*>(sketchptr)->getEstimate()); + est_and_bounds[1] = Float8GetDatum(static_cast<const hll_sketch*>(sketchptr)->getLowerBound(num_std_devs)); + est_and_bounds[2] = Float8GetDatum(static_cast<const hll_sketch*>(sketchptr)->getUpperBound(num_std_devs)); + return est_and_bounds; + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void hll_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) { + try { + std::stringstream s; + static_cast<const hll_sketch*>(sketchptr)->to_string(s); + snprintf(buffer, length, s.str().c_str()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* hll_sketch_serialize(const void* sketchptr) { + try { + auto data = static_cast<const hll_sketch*>(sketchptr)->serializeCompact(VARHDRSZ); + bytea* buffer = (bytea*) data.first.release(); + const size_t length = data.second; + SET_VARSIZE(buffer, length); + return buffer; + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* hll_sketch_deserialize(const char* buffer, unsigned length) { + try { + hll_sketch* sketchptr = new (palloc(sizeof(hll_sketch))) hll_sketch(hll_sketch::deserialize(buffer, length)); + return sketchptr; + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* hll_union_new(unsigned lg_k) { + try { + return new (palloc(sizeof(hll_union))) hll_union(lg_k); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void hll_union_delete(void* unionptr) { + try { + static_cast<hll_union*>(unionptr)->~hll_union(); + pfree(unionptr); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void hll_union_update(void* unionptr, const void* sketchptr) { + try { + static_cast<hll_union*>(unionptr)->update(*static_cast<const hll_sketch*>(sketchptr)); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* hll_union_get_result(void* unionptr) { + try { + return new (palloc(sizeof(hll_sketch))) hll_sketch(static_cast<hll_union*>(unionptr)->getResult()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* hll_union_get_result_tgt_type(void* unionptr, unsigned tgt_type) { + try { + return new (palloc(sizeof(hll_sketch))) hll_sketch(static_cast<hll_union*>(unionptr)->getResult( + tgt_type == 4 ? datasketches::TgtHllType::HLL_4 : tgt_type == 6 ? datasketches::TgtHllType::HLL_6 : datasketches::TgtHllType::HLL_8 + )); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} diff --git a/src/hll_sketch_c_adapter.h b/src/hll_sketch_c_adapter.h new file mode 100644 index 0000000..1f291a1 --- /dev/null +++ b/src/hll_sketch_c_adapter.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef HLL_SKETCH_C_ADAPTER_H +#define HLL_SKETCH_C_ADAPTER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include <postgres.h> + +void* hll_sketch_new(unsigned lg_k); +void* hll_sketch_new_tgt_type(unsigned lg_k, unsigned tgt_type); +void hll_sketch_delete(void* sketchptr); + +void hll_sketch_update(void* sketchptr, const void* data, unsigned length); +void hll_sketch_merge(void* sketchptr1, const void* sketchptr2); +double hll_sketch_get_estimate(const void* sketchptr); +Datum* hll_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs); +void hll_sketch_to_string(const void* sketchptr, char* buffer, unsigned length); + +void* hll_sketch_serialize(const void* sketchptr); +void* hll_sketch_deserialize(const char* buffer, unsigned length); + +void* hll_union_new(unsigned lg_k); +void hll_union_delete(void* unionptr); +void hll_union_update(void* unionptr, const void* sketchptr); +void* hll_union_get_result(void* unionptr); +void* hll_union_get_result_tgt_type(void* unionptr, unsigned tgt_type); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/hll_sketch_pg_functions.c b/src/hll_sketch_pg_functions.c new file mode 100644 index 0000000..59cdbe2 --- /dev/null +++ b/src/hll_sketch_pg_functions.c @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <postgres.h> +#include <fmgr.h> +#include <utils/lsyscache.h> +#include <utils/builtins.h> +#include <utils/array.h> +#include <catalog/pg_type.h> + +#include "hll_sketch_c_adapter.h" +#include "base64.h" + +const unsigned HLL_DEFAULT_LG_K = 12; + +/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */ +PG_FUNCTION_INFO_V1(pg_hll_sketch_add_item); +PG_FUNCTION_INFO_V1(pg_hll_sketch_get_estimate); +PG_FUNCTION_INFO_V1(pg_hll_sketch_get_estimate_and_bounds); +PG_FUNCTION_INFO_V1(pg_hll_sketch_to_string); +PG_FUNCTION_INFO_V1(pg_hll_sketch_union_agg); +PG_FUNCTION_INFO_V1(pg_hll_sketch_from_internal); +PG_FUNCTION_INFO_V1(pg_hll_sketch_get_estimate_from_internal); +PG_FUNCTION_INFO_V1(pg_hll_union_get_result); +PG_FUNCTION_INFO_V1(pg_hll_sketch_union); + +/* function declarations */ +Datum pg_hll_sketch_recv(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_send(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_add_item(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_get_estimate(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_to_string(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_union_agg(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_from_internal(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS); +Datum pg_hll_union_get_result(PG_FUNCTION_ARGS); +Datum pg_hll_sketch_union(PG_FUNCTION_ARGS); + +Datum pg_hll_sketch_add_item(PG_FUNCTION_ARGS) { + void* sketchptr; + unsigned lg_k; + unsigned tgt_type; + + // anyelement + Oid element_type; + Datum element; + int16 typlen; + bool typbyval; + char typalign; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) { + PG_RETURN_NULL(); + } else if (PG_ARGISNULL(1)) { + PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state + } + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "hll_sketch_add_item called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + if (PG_ARGISNULL(0)) { + lg_k = PG_GETARG_INT32(2); + if (lg_k == 0) lg_k = HLL_DEFAULT_LG_K; + tgt_type = PG_GETARG_INT32(3); + if (tgt_type) { + if ((tgt_type != 4) && (tgt_type != 6) && (tgt_type != 8)) { + elog(ERROR, "hll_sketch_add_item: unsupported target type, must be 4, 6 or 8"); + } + sketchptr = hll_sketch_new_tgt_type(lg_k, tgt_type); + } else { + sketchptr = hll_sketch_new(lg_k); + } + } else { + sketchptr = PG_GETARG_POINTER(0); + } + + element_type = get_fn_expr_argtype(fcinfo->flinfo, 1); + element = PG_GETARG_DATUM(1); + get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign); + if (typlen == -1) { + // varlena + hll_sketch_update(sketchptr, VARDATA_ANY(element), VARSIZE_ANY_EXHDR(element)); + } else if (typbyval) { + // fixed-length passed by value + hll_sketch_update(sketchptr, &element, typlen); + } else { + // fixed-length passed by reference + hll_sketch_update(sketchptr, (void*)element, typlen); + } + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_POINTER(sketchptr); +} + +Datum pg_hll_sketch_get_estimate(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + double estimate; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = hll_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + estimate = hll_sketch_get_estimate(sketchptr); + hll_sketch_delete(sketchptr); + PG_RETURN_FLOAT8(estimate); +} + +Datum pg_hll_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + int num_std_devs; + + // output array + Datum* est_and_bounds; + ArrayType* arr_out; + int16 elmlen_out; + bool elmbyval_out; + char elmalign_out; + + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = hll_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + num_std_devs = PG_GETARG_INT32(1); + if (num_std_devs == 0) num_std_devs = 1; // default + est_and_bounds = hll_sketch_get_estimate_and_bounds(sketchptr, num_std_devs); + hll_sketch_delete(sketchptr); + + // construct output array + get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out); + arr_out = construct_array(est_and_bounds, 3, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out); + PG_RETURN_ARRAYTYPE_P(arr_out); +} + +Datum pg_hll_sketch_to_string(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + char str[1024]; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = hll_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + hll_sketch_to_string(sketchptr, str, 1024); + hll_sketch_delete(sketchptr); + PG_RETURN_TEXT_P(cstring_to_text(str)); +} + +struct hll_union_state { + void* unionptr; + unsigned tgt_type; +}; + +Datum pg_hll_sketch_union_agg(PG_FUNCTION_ARGS) { + struct hll_union_state* stateptr; + bytea* sketch_bytes; + void* sketchptr; + unsigned lg_k; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) { + PG_RETURN_NULL(); + } else if (PG_ARGISNULL(1)) { + PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state + } + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "hll_sketch_union_agg called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + if (PG_ARGISNULL(0)) { + lg_k = PG_GETARG_INT32(2); + stateptr = palloc(sizeof(struct hll_union_state)); + stateptr->unionptr = hll_union_new(lg_k ? lg_k : HLL_DEFAULT_LG_K); + stateptr->tgt_type = PG_GETARG_INT32(3); + if (stateptr->tgt_type) { + if ((stateptr->tgt_type != 4) && (stateptr->tgt_type != 6) && (stateptr->tgt_type != 8)) { + elog(ERROR, "hll_sketch_union_agg: unsupported target type, must be 4, 6 or 8"); + } + } + } else { + stateptr = (struct hll_union_state*) PG_GETARG_POINTER(0); + } + + sketch_bytes = PG_GETARG_BYTEA_P(1); + sketchptr = hll_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ); + hll_union_update(stateptr->unionptr, sketchptr); + hll_sketch_delete(sketchptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_POINTER(stateptr); +} + +Datum pg_hll_sketch_from_internal(PG_FUNCTION_ARGS) { + void* sketchptr; + bytea* bytes_out; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0)) PG_RETURN_NULL(); + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "hll_sketch_from_internal called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + sketchptr = PG_GETARG_POINTER(0); + bytes_out = hll_sketch_serialize(sketchptr); + hll_sketch_delete(sketchptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_BYTEA_P(bytes_out); +} + +Datum pg_hll_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) { + void* sketchptr; + double estimate; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0)) PG_RETURN_NULL(); + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "hll_sketch_from_internal called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + sketchptr = PG_GETARG_POINTER(0); + estimate = hll_sketch_get_estimate(sketchptr); + hll_sketch_delete(sketchptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_FLOAT8(estimate); +} + +Datum pg_hll_union_get_result(PG_FUNCTION_ARGS) { + struct hll_union_state* stateptr; + void* sketchptr; + bytea* bytes_out; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0)) PG_RETURN_NULL(); + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "hll_union_get_result called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + stateptr = (struct hll_union_state*) PG_GETARG_POINTER(0); + if (stateptr->tgt_type) { + sketchptr = hll_union_get_result_tgt_type(stateptr->unionptr, stateptr->tgt_type); + } else { + sketchptr = hll_union_get_result(stateptr->unionptr); + } + bytes_out = hll_sketch_serialize(sketchptr); + hll_sketch_delete(sketchptr); + hll_union_delete(stateptr->unionptr); + pfree(stateptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_BYTEA_P(bytes_out); +} + +Datum pg_hll_sketch_union(PG_FUNCTION_ARGS) { + const bytea* bytes_in1; + const bytea* bytes_in2; + void* sketchptr1; + void* sketchptr2; + void* unionptr; + void* sketchptr; + bytea* bytes_out; + unsigned lg_k; + unsigned tgt_type; + + lg_k = PG_GETARG_INT32(2); + tgt_type = PG_GETARG_INT32(3); + if (tgt_type) { + if ((tgt_type != 4) && (tgt_type != 6) && (tgt_type != 8)) { + elog(ERROR, "hll_sketch_union: unsupported target type, must be 4, 6 or 8"); + } + } + unionptr = hll_union_new(lg_k ? lg_k : HLL_DEFAULT_LG_K); + if (!PG_ARGISNULL(0)) { + bytes_in1 = PG_GETARG_BYTEA_P(0); + sketchptr1 = hll_sketch_deserialize(VARDATA(bytes_in1), VARSIZE(bytes_in1) - VARHDRSZ); + hll_union_update(unionptr, sketchptr1); + hll_sketch_delete(sketchptr1); + } + if (!PG_ARGISNULL(1)) { + bytes_in2 = PG_GETARG_BYTEA_P(1); + sketchptr2 = hll_sketch_deserialize(VARDATA(bytes_in2), VARSIZE(bytes_in2) - VARHDRSZ); + hll_union_update(unionptr, sketchptr2); + hll_sketch_delete(sketchptr2); + } + if (tgt_type) { + sketchptr = hll_union_get_result_tgt_type(unionptr, tgt_type); + } else { + sketchptr = hll_union_get_result(unionptr); + } + hll_union_delete(unionptr); + bytes_out = hll_sketch_serialize(sketchptr); + hll_sketch_delete(sketchptr); + PG_RETURN_BYTEA_P(bytes_out); +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
