This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch kll_double in repository https://gitbox.apache.org/repos/asf/datasketches-postgresql.git
commit 55a2a6282520355302b1dfa41ec87b2ababc2c49 Author: AlexanderSaydakov <[email protected]> AuthorDate: Wed May 25 13:37:34 2022 -0700 kll_double --- META.json | 7 +- Makefile | 2 + sql/datasketches_kll_double_sketch.sql | 115 ++++++ ...adapter.cpp => kll_double_sketch_c_adapter.cpp} | 64 ++-- src/kll_double_sketch_c_adapter.h | 52 +++ src/kll_double_sketch_pg_functions.c | 384 +++++++++++++++++++++ src/kll_float_sketch_c_adapter.cpp | 2 +- test/kll_double_sketch_test.sql | 33 ++ ...l_sketch_test.sql => kll_float_sketch_test.sql} | 0 9 files changed, 625 insertions(+), 34 deletions(-) diff --git a/META.json b/META.json index e1bc1ca..81a1b79 100644 --- a/META.json +++ b/META.json @@ -33,6 +33,11 @@ "file": "sql/datasketches_kll_float_sketch.sql", "version": "1.6.0-SNAPSHOT" }, + "kll_double_sketch": { + "abstract": "KLL quantiles sketch for approximating distributions of double values (quanitles, ranks, histograms)", + "file": "sql/datasketches_kll_double_sketch.sql", + "version": "1.6.0-SNAPSHOT" + }, "req_float_sketch": { "abstract": "REQ (Relative Error Quantiles) sketch for approximating distributions of float values (quanitles, ranks, histograms)", "file": "sql/datasketches_req_float_sketch.sql", @@ -44,7 +49,7 @@ "version": "1.6.0-SNAPSHOT" }, "quantiles_double_sketch": { - "abstract": "Quantiles sketch for approximating distributions of double values (quanitles, ranks, histograms)", + "abstract": "Quantiles sketch for approximating distributions of double values (quanitles, ranks, histograms), superseded by KLL sketch, included to support legacy sketch data from other platforms", "file": "sql/datasketches_quantiles_double_sketch.sql", "version": "1.6.0-SNAPSHOT" } diff --git a/Makefile b/Makefile index feea4ef..56c74d0 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,7 @@ MODULE_big = datasketches SQL_MODULES = sql/datasketches_cpc_sketch.sql \ sql/datasketches_kll_float_sketch.sql \ + sql/datasketches_kll_double_sketch.sql \ sql/datasketches_theta_sketch.sql \ sql/datasketches_frequent_strings_sketch.sql \ sql/datasketches_hll_sketch.sql \ @@ -34,6 +35,7 @@ EXTRA_CLEAN = $(SQL_INSTALL) OBJS = src/global_hooks.o src/base64.o src/common.o \ src/kll_float_sketch_pg_functions.o src/kll_float_sketch_c_adapter.o \ + src/kll_double_sketch_pg_functions.o src/kll_double_sketch_c_adapter.o \ src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o \ src/theta_sketch_pg_functions.o src/theta_sketch_c_adapter.o \ src/frequent_strings_sketch_pg_functions.o src/frequent_strings_sketch_c_adapter.o \ diff --git a/sql/datasketches_kll_double_sketch.sql b/sql/datasketches_kll_double_sketch.sql new file mode 100644 index 0000000..b606814 --- /dev/null +++ b/sql/datasketches_kll_double_sketch.sql @@ -0,0 +1,115 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TYPE kll_double_sketch; + +CREATE OR REPLACE FUNCTION kll_double_sketch_in(cstring) RETURNS kll_double_sketch + AS '$libdir/datasketches', 'pg_sketch_in' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_out(kll_double_sketch) RETURNS cstring + AS '$libdir/datasketches', 'pg_sketch_out' + LANGUAGE C STRICT IMMUTABLE; + +CREATE TYPE kll_double_sketch ( + INPUT = kll_double_sketch_in, + OUTPUT = kll_double_sketch_out, + STORAGE = EXTERNAL +); + +CREATE CAST (bytea as kll_double_sketch) WITHOUT FUNCTION AS ASSIGNMENT; +CREATE CAST (kll_double_sketch as bytea) WITHOUT FUNCTION AS ASSIGNMENT; + +CREATE OR REPLACE FUNCTION kll_double_sketch_add_item(internal, double precision) RETURNS internal + AS '$libdir/datasketches', 'pg_kll_double_sketch_add_item' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_add_item(internal, double precision, int) RETURNS internal + AS '$libdir/datasketches', 'pg_kll_double_sketch_add_item' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_rank(kll_double_sketch, double precision) RETURNS double precision + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_rank' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_quantile(kll_double_sketch, double precision) RETURNS double precision + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_quantile' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_n(kll_double_sketch) RETURNS bigint + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_n' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_to_string(kll_double_sketch) RETURNS TEXT + AS '$libdir/datasketches', 'pg_kll_double_sketch_to_string' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_merge(internal, kll_double_sketch) RETURNS internal + AS '$libdir/datasketches', 'pg_kll_double_sketch_merge' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_merge(internal, kll_double_sketch, int) RETURNS internal + AS '$libdir/datasketches', 'pg_kll_double_sketch_merge' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_from_internal(internal) RETURNS kll_double_sketch + AS '$libdir/datasketches', 'pg_kll_double_sketch_from_internal' + LANGUAGE C STRICT IMMUTABLE; + +CREATE AGGREGATE kll_double_sketch_build(double precision) ( + sfunc = kll_double_sketch_add_item, + stype = internal, + finalfunc = kll_double_sketch_from_internal +); + +CREATE AGGREGATE kll_double_sketch_build(double precision, int) ( + sfunc = kll_double_sketch_add_item, + stype = internal, + finalfunc = kll_double_sketch_from_internal +); + +CREATE AGGREGATE kll_double_sketch_merge(kll_double_sketch) ( + sfunc = kll_double_sketch_merge, + stype = internal, + finalfunc = kll_double_sketch_from_internal +); + +CREATE AGGREGATE kll_double_sketch_merge(kll_double_sketch, int) ( + sfunc = kll_double_sketch_merge, + stype = internal, + finalfunc = kll_double_sketch_from_internal +); + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_pmf(kll_double_sketch, double precision[]) RETURNS double precision[] + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_pmf' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_cdf(kll_double_sketch, double precision[]) RETURNS double precision[] + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_cdf' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_quantiles(kll_double_sketch, double precision[]) RETURNS double precision[] + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_quantiles' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_histogram(kll_double_sketch) RETURNS double precision[] + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_histogram' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION kll_double_sketch_get_histogram(kll_double_sketch, int) RETURNS double precision[] + AS '$libdir/datasketches', 'pg_kll_double_sketch_get_histogram' + LANGUAGE C STRICT IMMUTABLE; diff --git a/src/kll_float_sketch_c_adapter.cpp b/src/kll_double_sketch_c_adapter.cpp similarity index 51% copy from src/kll_float_sketch_c_adapter.cpp copy to src/kll_double_sketch_c_adapter.cpp index b42ca30..e7f183f 100644 --- a/src/kll_float_sketch_c_adapter.cpp +++ b/src/kll_double_sketch_c_adapter.cpp @@ -17,78 +17,78 @@ * under the License. */ -#include "kll_float_sketch_c_adapter.h" +#include "kll_double_sketch_c_adapter.h" #include "allocator.h" #include "postgres_h_substitute.h" #include <kll_sketch.hpp> -typedef datasketches::kll_sketch<float, std::less<float>, datasketches::serde<float>, palloc_allocator<float>> kll_float_sketch; +using kll_double_sketch = datasketches::kll_sketch<double, std::less<double>, datasketches::serde<double>, palloc_allocator<double>>; -void* kll_float_sketch_new(unsigned k) { +void* kll_double_sketch_new(unsigned k) { try { - return new (palloc(sizeof(kll_float_sketch))) kll_float_sketch(k); + return new (palloc(sizeof(kll_double_sketch))) kll_double_sketch(k); } catch (std::exception& e) { pg_error(e.what()); } pg_unreachable(); } -void kll_float_sketch_delete(void* sketchptr) { +void kll_double_sketch_delete(void* sketchptr) { try { - static_cast<kll_float_sketch*>(sketchptr)->~kll_float_sketch(); + static_cast<kll_double_sketch*>(sketchptr)->~kll_double_sketch(); pfree(sketchptr); } catch (std::exception& e) { pg_error(e.what()); } } -void kll_float_sketch_update(void* sketchptr, float value) { +void kll_double_sketch_update(void* sketchptr, double value) { try { - static_cast<kll_float_sketch*>(sketchptr)->update(value); + static_cast<kll_double_sketch*>(sketchptr)->update(value); } catch (std::exception& e) { pg_error(e.what()); } } -void kll_float_sketch_merge(void* sketchptr1, const void* sketchptr2) { +void kll_double_sketch_merge(void* sketchptr1, const void* sketchptr2) { try { - static_cast<kll_float_sketch*>(sketchptr1)->merge(*static_cast<const kll_float_sketch*>(sketchptr2)); + static_cast<kll_double_sketch*>(sketchptr1)->merge(*static_cast<const kll_double_sketch*>(sketchptr2)); } catch (std::exception& e) { pg_error(e.what()); } } -double kll_float_sketch_get_rank(const void* sketchptr, float value) { +double kll_double_sketch_get_rank(const void* sketchptr, double value) { try { - return static_cast<const kll_float_sketch*>(sketchptr)->get_rank(value); + return static_cast<const kll_double_sketch*>(sketchptr)->get_rank(value); } catch (std::exception& e) { pg_error(e.what()); } pg_unreachable(); } -float kll_float_sketch_get_quantile(const void* sketchptr, double rank) { +double kll_double_sketch_get_quantile(const void* sketchptr, double rank) { try { - return static_cast<const kll_float_sketch*>(sketchptr)->get_quantile(rank); + return static_cast<const kll_double_sketch*>(sketchptr)->get_quantile(rank); } catch (std::exception& e) { pg_error(e.what()); } pg_unreachable(); } -unsigned long long kll_float_sketch_get_n(const void* sketchptr) { +unsigned long long kll_double_sketch_get_n(const void* sketchptr) { try { - return static_cast<const kll_float_sketch*>(sketchptr)->get_n(); + return static_cast<const kll_double_sketch*>(sketchptr)->get_n(); } catch (std::exception& e) { pg_error(e.what()); } pg_unreachable(); } -char* kll_float_sketch_to_string(const void* sketchptr) { +char* kll_double_sketch_to_string(const void* sketchptr) { try { - auto str = static_cast<const kll_float_sketch*>(sketchptr)->to_string(); + auto str = static_cast<const kll_double_sketch*>(sketchptr)->to_string(); const size_t len = str.length() + 1; char* buffer = (char*) palloc(len); strncpy(buffer, str.c_str(), len); @@ -99,11 +99,11 @@ char* kll_float_sketch_to_string(const void* sketchptr) { pg_unreachable(); } -ptr_with_size kll_float_sketch_serialize(const void* sketchptr, unsigned header_size) { +ptr_with_size kll_double_sketch_serialize(const void* sketchptr, unsigned header_size) { try { ptr_with_size p; - auto bytes = new (palloc(sizeof(kll_float_sketch::vector_bytes))) kll_float_sketch::vector_bytes( - static_cast<const kll_float_sketch*>(sketchptr)->serialize(header_size) + auto bytes = new (palloc(sizeof(kll_double_sketch::vector_bytes))) kll_double_sketch::vector_bytes( + static_cast<const kll_double_sketch*>(sketchptr)->serialize(header_size) ); p.ptr = bytes->data(); p.size = bytes->size(); @@ -114,31 +114,31 @@ ptr_with_size kll_float_sketch_serialize(const void* sketchptr, unsigned header_ pg_unreachable(); } -void* kll_float_sketch_deserialize(const char* buffer, unsigned length) { +void* kll_double_sketch_deserialize(const char* buffer, unsigned length) { try { - return new (palloc(sizeof(kll_float_sketch))) kll_float_sketch(kll_float_sketch::deserialize(buffer, length)); + return new (palloc(sizeof(kll_double_sketch))) kll_double_sketch(kll_double_sketch::deserialize(buffer, length)); } catch (std::exception& e) { pg_error(e.what()); } pg_unreachable(); } -unsigned kll_float_sketch_get_serialized_size_bytes(const void* sketchptr) { +unsigned kll_double_sketch_get_serialized_size_bytes(const void* sketchptr) { try { - return static_cast<const kll_float_sketch*>(sketchptr)->get_serialized_size_bytes(); + return static_cast<const kll_double_sketch*>(sketchptr)->get_serialized_size_bytes(); } catch (std::exception& e) { pg_error(e.what()); } pg_unreachable(); } -Datum* kll_float_sketch_get_pmf_or_cdf(const void* sketchptr, const float* split_points, unsigned num_split_points, bool is_cdf, bool scale) { +Datum* kll_double_sketch_get_pmf_or_cdf(const void* sketchptr, const double* split_points, unsigned num_split_points, bool is_cdf, bool scale) { try { auto array = is_cdf ? - static_cast<const kll_float_sketch*>(sketchptr)->get_CDF(split_points, num_split_points) : - static_cast<const kll_float_sketch*>(sketchptr)->get_PMF(split_points, num_split_points); + static_cast<const kll_double_sketch*>(sketchptr)->get_CDF(split_points, num_split_points) : + static_cast<const kll_double_sketch*>(sketchptr)->get_PMF(split_points, num_split_points); Datum* pmf = (Datum*) palloc(sizeof(Datum) * (num_split_points + 1)); - const uint64_t n = static_cast<const kll_float_sketch*>(sketchptr)->get_n(); + const uint64_t n = static_cast<const kll_double_sketch*>(sketchptr)->get_n(); for (unsigned i = 0; i < num_split_points + 1; i++) { if (scale) { pmf[i] = pg_float8_get_datum(array[i] * n); @@ -153,12 +153,12 @@ Datum* kll_float_sketch_get_pmf_or_cdf(const void* sketchptr, const float* split pg_unreachable(); } -Datum* kll_float_sketch_get_quantiles(const void* sketchptr, const double* fractions, unsigned num_fractions) { +Datum* kll_double_sketch_get_quantiles(const void* sketchptr, const double* fractions, unsigned num_fractions) { try { - auto array = static_cast<const kll_float_sketch*>(sketchptr)->get_quantiles(fractions, num_fractions); + auto array = static_cast<const kll_double_sketch*>(sketchptr)->get_quantiles(fractions, num_fractions); Datum* quantiles = (Datum*) palloc(sizeof(Datum) * num_fractions); for (unsigned i = 0; i < num_fractions; i++) { - quantiles[i] = pg_float4_get_datum(array[i]); + quantiles[i] = pg_float8_get_datum(array[i]); } return quantiles; } catch (std::exception& e) { diff --git a/src/kll_double_sketch_c_adapter.h b/src/kll_double_sketch_c_adapter.h new file mode 100644 index 0000000..08d172e --- /dev/null +++ b/src/kll_double_sketch_c_adapter.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef KLL_DOUBLE_SKETCH_C_ADAPTER_H +#define KLL_DOUBLE_SKETCH_C_ADAPTER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "ptr_with_size.h" + +static const unsigned DEFAULT_K = 200; + +void* kll_double_sketch_new(unsigned k); +void kll_double_sketch_delete(void* sketchptr); + +void kll_double_sketch_update(void* sketchptr, double value); +void kll_double_sketch_merge(void* sketchptr1, const void* sketchptr2); +double kll_double_sketch_get_rank(const void* sketchptr, double value); +double kll_double_sketch_get_quantile(const void* sketchptr, double rank); +unsigned long long kll_double_sketch_get_n(const void* sketchptr); +char* kll_double_sketch_to_string(const void* sketchptr); + +struct ptr_with_size kll_double_sketch_serialize(const void* sketchptr, unsigned header_size); +void* kll_double_sketch_deserialize(const char* buffer, unsigned length); +unsigned kll_double_sketch_get_serialized_size_bytes(const void* sketchptr); + +void** kll_double_sketch_get_pmf_or_cdf(const void* sketchptr, const double* split_points, unsigned num_split_points, bool is_cdf, bool scale); +void** kll_double_sketch_get_quantiles(const void* sketchptr, const double* fractions, unsigned num_fractions); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/kll_double_sketch_pg_functions.c b/src/kll_double_sketch_pg_functions.c new file mode 100644 index 0000000..b1d9616 --- /dev/null +++ b/src/kll_double_sketch_pg_functions.c @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <postgres.h> +#include <fmgr.h> +#include <utils/lsyscache.h> +#include <utils/builtins.h> +#include <utils/array.h> +#include <catalog/pg_type.h> + +#include "kll_double_sketch_c_adapter.h" +#include "base64.h" + +/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */ +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_add_item); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_get_rank); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_get_quantile); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_get_n); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_to_string); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_merge); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_from_internal); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_get_pmf); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_get_cdf); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_get_quantiles); +PG_FUNCTION_INFO_V1(pg_kll_double_sketch_get_histogram); + +/* function declarations */ +Datum pg_kll_double_sketch_recv(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_send(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_add_item(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_get_rank(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_get_quantile(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_get_n(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_to_string(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_merge(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_from_internal(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_get_pmf(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_get_cdf(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_get_quantiles(PG_FUNCTION_ARGS); +Datum pg_kll_double_sketch_get_histogram(PG_FUNCTION_ARGS); + +static const unsigned DEFAULT_NUM_BINS = 10; + +Datum pg_kll_double_sketch_add_item(PG_FUNCTION_ARGS) { + void* sketchptr; + double value; + int k; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) { + PG_RETURN_NULL(); + } else if (PG_ARGISNULL(1)) { + PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state + } + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "kll_double_sketch_add_item called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + if (PG_ARGISNULL(0)) { + k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K; + sketchptr = kll_double_sketch_new(k); + } else { + sketchptr = PG_GETARG_POINTER(0); + } + + value = PG_GETARG_FLOAT8(1); + kll_double_sketch_update(sketchptr, value); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_POINTER(sketchptr); +} + +Datum pg_kll_double_sketch_get_rank(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + double value; + double rank; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + value = PG_GETARG_FLOAT8(1); + rank = kll_double_sketch_get_rank(sketchptr, value); + kll_double_sketch_delete(sketchptr); + PG_RETURN_FLOAT8(rank); +} + +Datum pg_kll_double_sketch_get_quantile(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + double value; + double rank; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + rank = PG_GETARG_FLOAT8(1); + value = kll_double_sketch_get_quantile(sketchptr, rank); + kll_double_sketch_delete(sketchptr); + PG_RETURN_FLOAT8(value); +} + +Datum pg_kll_double_sketch_get_n(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + uint64 n; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + n = kll_double_sketch_get_n(sketchptr); + kll_double_sketch_delete(sketchptr); + PG_RETURN_INT64(n); +} + +Datum pg_kll_double_sketch_to_string(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + char* str; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + str = kll_double_sketch_to_string(sketchptr); + kll_double_sketch_delete(sketchptr); + PG_RETURN_TEXT_P(cstring_to_text(str)); +} + +Datum pg_kll_double_sketch_merge(PG_FUNCTION_ARGS) { + void* unionptr; + bytea* sketch_bytes; + void* sketchptr; + int k; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) { + PG_RETURN_NULL(); + } else if (PG_ARGISNULL(1)) { + PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state + } + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "kll_double_sketch_merge called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + if (PG_ARGISNULL(0)) { + k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K; + unionptr = kll_double_sketch_new(k); + } else { + unionptr = PG_GETARG_POINTER(0); + } + + sketch_bytes = PG_GETARG_BYTEA_P(1); + sketchptr = kll_double_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ); + kll_double_sketch_merge(unionptr, sketchptr); + kll_double_sketch_delete(sketchptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_POINTER(unionptr); +} + +Datum pg_kll_double_sketch_from_internal(PG_FUNCTION_ARGS) { + void* sketchptr; + struct ptr_with_size bytes_out; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0)) PG_RETURN_NULL(); + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "kll_double_sketch_from_internal called in non-aggregate context"); + } + sketchptr = PG_GETARG_POINTER(0); + bytes_out = kll_double_sketch_serialize(sketchptr, VARHDRSZ); + kll_double_sketch_delete(sketchptr); + SET_VARSIZE(bytes_out.ptr, bytes_out.size); + PG_RETURN_BYTEA_P(bytes_out.ptr); +} + +Datum pg_kll_double_sketch_get_pmf(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + + // input array of split points + ArrayType* arr_in; + Oid elmtype_in; + int16 elmlen_in; + bool elmbyval_in; + char elmalign_in; + Datum* data_in; + bool* nulls_in; + int arr_len_in; + double* split_points; + + // output array of fractions + Datum* result; + ArrayType* arr_out; + int16 elmlen_out; + bool elmbyval_out; + char elmalign_out; + int arr_len_out; + + int i; + + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + + arr_in = PG_GETARG_ARRAYTYPE_P(1); + elmtype_in = ARR_ELEMTYPE(arr_in); + get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in); + deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in); + + split_points = palloc(sizeof(double) * arr_len_in); + for (i = 0; i < arr_len_in; i++) { + split_points[i] = DatumGetFloat8(data_in[i]); + } + result = (Datum*) kll_double_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, false, false); + pfree(split_points); + + // construct output array of fractions + arr_len_out = arr_len_in + 1; // N split points divide the number line into N+1 intervals + get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out); + arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out); + + kll_double_sketch_delete(sketchptr); + + PG_RETURN_ARRAYTYPE_P(arr_out); +} + +Datum pg_kll_double_sketch_get_cdf(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + + // input array of split points + ArrayType* arr_in; + Oid elmtype_in; + int16 elmlen_in; + bool elmbyval_in; + char elmalign_in; + Datum* data_in; + bool* nulls_in; + int arr_len_in; + double* split_points; + + // output array of fractions + Datum* result; + ArrayType* arr_out; + int16 elmlen_out; + bool elmbyval_out; + char elmalign_out; + int arr_len_out; + + int i; + + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + + arr_in = PG_GETARG_ARRAYTYPE_P(1); + elmtype_in = ARR_ELEMTYPE(arr_in); + get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in); + deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in); + + split_points = palloc(sizeof(double) * arr_len_in); + for (i = 0; i < arr_len_in; i++) { + split_points[i] = DatumGetFloat8(data_in[i]); + } + result = (Datum*) kll_double_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, true, false); + pfree(split_points); + + // construct output array of fractions + arr_len_out = arr_len_in + 1; // N split points divide the number line into N+1 intervals + get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out); + arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out); + + kll_double_sketch_delete(sketchptr); + + PG_RETURN_ARRAYTYPE_P(arr_out); +} + +Datum pg_kll_double_sketch_get_quantiles(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + + // input array of fractions + ArrayType* arr_in; + Oid elmtype_in; + int16 elmlen_in; + bool elmbyval_in; + char elmalign_in; + Datum* data_in; + bool* nulls_in; + int arr_len; + double* fractions; + + // output array of quantiles + Datum* quantiles; + ArrayType* arr_out; + int16 elmlen_out; + bool elmbyval_out; + char elmalign_out; + + int i; + + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + + arr_in = PG_GETARG_ARRAYTYPE_P(1); + elmtype_in = ARR_ELEMTYPE(arr_in); + get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in); + deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len); + + fractions = palloc(sizeof(double) * arr_len); + for (i = 0; i < arr_len; i++) { + fractions[i] = DatumGetFloat8(data_in[i]); + } + quantiles = (Datum*) kll_double_sketch_get_quantiles(sketchptr, fractions, arr_len); + pfree(fractions); + + // construct output array of quantiles + get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out); + arr_out = construct_array(quantiles, arr_len, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out); + + kll_double_sketch_delete(sketchptr); + + PG_RETURN_ARRAYTYPE_P(arr_out); +} + +Datum pg_kll_double_sketch_get_histogram(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + int num_bins; + + // output array of bins + Datum* result; + ArrayType* arr_out; + int16 elmlen_out; + bool elmbyval_out; + char elmalign_out; + int arr_len_out; + + int i; + + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = kll_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + + num_bins = PG_NARGS() > 1 ? PG_GETARG_INT32(1) : DEFAULT_NUM_BINS; + if (num_bins < 2) { + elog(ERROR, "at least two bins expected"); + } + + double* split_points = palloc(sizeof(double) * (num_bins - 1)); + const double min_value = kll_double_sketch_get_quantile(sketchptr, 0); + const double max_value = kll_double_sketch_get_quantile(sketchptr, 1); + const double delta = (max_value - min_value) / num_bins; + for (i = 0; i < num_bins - 1; i++) { + split_points[i] = min_value + delta * (i + 1); + } + result = (Datum*) kll_double_sketch_get_pmf_or_cdf(sketchptr, split_points, num_bins - 1, false, true); + pfree(split_points); + + // construct output array + arr_len_out = num_bins; + get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out); + arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out); + + kll_double_sketch_delete(sketchptr); + + PG_RETURN_ARRAYTYPE_P(arr_out); +} diff --git a/src/kll_float_sketch_c_adapter.cpp b/src/kll_float_sketch_c_adapter.cpp index b42ca30..f9e7e2c 100644 --- a/src/kll_float_sketch_c_adapter.cpp +++ b/src/kll_float_sketch_c_adapter.cpp @@ -23,7 +23,7 @@ #include <kll_sketch.hpp> -typedef datasketches::kll_sketch<float, std::less<float>, datasketches::serde<float>, palloc_allocator<float>> kll_float_sketch; +using kll_float_sketch = datasketches::kll_sketch<float, std::less<float>, datasketches::serde<float>, palloc_allocator<float>>; void* kll_float_sketch_new(unsigned k) { try { diff --git a/test/kll_double_sketch_test.sql b/test/kll_double_sketch_test.sql new file mode 100644 index 0000000..ab9fbd7 --- /dev/null +++ b/test/kll_double_sketch_test.sql @@ -0,0 +1,33 @@ +drop extension if exists datasketches cascade; +create extension datasketches; + +drop table if exists kll_sketch_test; +create table kll_sketch_test(sketch kll_double_sketch); + +-- default k +insert into kll_sketch_test + select kll_double_sketch_build(value) + from (values (1), (2), (3), (4), (5)) as t(value) +; + +-- k = 20 +insert into kll_sketch_test + select kll_double_sketch_build(value, 20) + from (values (6), (7), (8), (9), (10)) as t(value) +; + +-- get min and max values +select kll_double_sketch_get_quantiles(sketch, array[0, 1]) as min_max from kll_sketch_test; +select kll_double_sketch_to_string(sketch) from kll_sketch_test; + +-- default k, median +select kll_double_sketch_get_quantile(kll_double_sketch_merge(sketch), 0.5) as median from kll_sketch_test; +-- k = 20, rank of value 6 +select kll_double_sketch_get_rank(kll_double_sketch_merge(sketch, 20), 6) as rank from kll_sketch_test; + +select kll_double_sketch_get_pmf(kll_double_sketch_merge(sketch, 20), array[2, 5, 7]) as pmf from kll_sketch_test; +select kll_double_sketch_get_cdf(kll_double_sketch_merge(sketch, 20), array[2, 5, 7]) as cdf from kll_sketch_test; +select kll_double_sketch_get_histogram(kll_double_sketch_merge(sketch, 20), 5) as histogram from kll_sketch_test; + +drop table kll_sketch_test; +drop extension datasketches; diff --git a/test/kll_sketch_test.sql b/test/kll_float_sketch_test.sql similarity index 100% rename from test/kll_sketch_test.sql rename to test/kll_float_sketch_test.sql --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
