This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch theta in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git
commit f33856dc13a3aebe433bf6d7c658de0092cc6556 Author: AlexanderSaydakov <[email protected]> AuthorDate: Mon Jun 3 16:25:16 2019 -0700 theta draft --- Makefile | 11 +- sql/datasketches_theta_sketch.sql | 93 +++++++++++++++++ src/theta_sketch_c_adapter.cpp | 145 +++++++++++++++++++++++++ src/theta_sketch_c_adapter.h | 36 +++++++ src/theta_sketch_pg_functions.c | 215 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 495 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 6f08583..bcac339 100644 --- a/Makefile +++ b/Makefile @@ -3,17 +3,18 @@ MODULE_big = datasketches OBJS = src/base64.o src/common.o \ src/kll_float_sketch_pg_functions.o src/kll_float_sketch_c_adapter.o \ - src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o + src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o \ + src/theta_sketch_pg_functions.o src/theta_sketch_c_adapter.o -# assume a copy or link sketches-core-cpp in the current dir -CORE = sketches-core-cpp +# assume a copy or link datasketches-cpp in the current dir +CORE = datasketches-cpp CPC = $(CORE)/cpc/src OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o -DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql +DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql sql/datasketches_theta_sketch.sql CXX = g++-8 -PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include +PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include SHLIB_LINK = -lstdc++ -L/usr/local/lib PG_CONFIG = pg_config diff --git a/sql/datasketches_theta_sketch.sql b/sql/datasketches_theta_sketch.sql new file mode 100644 index 0000000..f5200ff --- /dev/null +++ b/sql/datasketches_theta_sketch.sql @@ -0,0 +1,93 @@ +-- Copyright 2019, Verizon Media. +-- Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. + +CREATE TYPE theta_sketch; + +CREATE OR REPLACE FUNCTION theta_sketch_in(cstring) RETURNS theta_sketch + AS '$libdir/datasketches', 'pg_sketch_in' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_out(theta_sketch) RETURNS cstring + AS '$libdir/datasketches', 'pg_sketch_out' + LANGUAGE C STRICT IMMUTABLE; + +CREATE TYPE theta_sketch ( + INPUT = theta_sketch_in, + OUTPUT = theta_sketch_out, + STORAGE = EXTERNAL +); + +CREATE CAST (bytea as theta_sketch) WITHOUT FUNCTION AS ASSIGNMENT; +CREATE CAST (theta_sketch as bytea) WITHOUT FUNCTION AS ASSIGNMENT; + +CREATE OR REPLACE FUNCTION theta_sketch_add_item(internal, anyelement) RETURNS internal + AS '$libdir/datasketches', 'pg_theta_sketch_add_item' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_add_item(internal, anyelement, int) RETURNS internal + AS '$libdir/datasketches', 'pg_theta_sketch_add_item' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_get_estimate(theta_sketch) RETURNS double precision + AS '$libdir/datasketches', 'pg_theta_sketch_get_estimate' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_from_internal(internal) RETURNS theta_sketch + AS '$libdir/datasketches', 'pg_theta_sketch_from_internal' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_get_estimate_from_internal(internal) RETURNS double precision + AS '$libdir/datasketches', 'pg_theta_sketch_get_estimate_from_internal' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_to_string(theta_sketch) RETURNS TEXT + AS '$libdir/datasketches', 'pg_theta_sketch_to_string' + LANGUAGE C STRICT IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch) RETURNS internal + AS '$libdir/datasketches', 'pg_theta_sketch_union' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch, int) RETURNS internal + AS '$libdir/datasketches', 'pg_theta_sketch_union' + LANGUAGE C IMMUTABLE; + +CREATE OR REPLACE FUNCTION theta_union_get_result(internal) RETURNS theta_sketch + AS '$libdir/datasketches', 'pg_theta_union_get_result' + LANGUAGE C STRICT IMMUTABLE; + +CREATE AGGREGATE theta_sketch_distinct(anyelement) ( + sfunc = theta_sketch_add_item, + stype = internal, + finalfunc = theta_sketch_get_estimate_from_internal +); + +CREATE AGGREGATE theta_sketch_distinct(anyelement, int) ( + sfunc = theta_sketch_add_item, + stype = internal, + finalfunc = theta_sketch_get_estimate_from_internal +); + +CREATE AGGREGATE theta_sketch_build(anyelement) ( + sfunc = theta_sketch_add_item, + stype = internal, + finalfunc = theta_sketch_from_internal +); + +CREATE AGGREGATE theta_sketch_build(anyelement, int) ( + sfunc = theta_sketch_add_item, + stype = internal, + finalfunc = theta_sketch_from_internal +); + +CREATE AGGREGATE theta_sketch_union(theta_sketch) ( + sfunc = theta_sketch_union, + stype = internal, + finalfunc = theta_union_get_result +); + +CREATE AGGREGATE theta_sketch_union(theta_sketch, int) ( + sfunc = theta_sketch_union, + stype = internal, + finalfunc = theta_union_get_result +); diff --git a/src/theta_sketch_c_adapter.cpp b/src/theta_sketch_c_adapter.cpp new file mode 100644 index 0000000..968204c --- /dev/null +++ b/src/theta_sketch_c_adapter.cpp @@ -0,0 +1,145 @@ +/* + * Copyright 2019, Verizon Media. + * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. + */ + +#include "theta_sketch_c_adapter.h" +#include "allocator.h" + +extern "C" { +#include <postgres.h> +} + +#include <sstream> + +#include <theta_sketch.hpp> +#include <theta_union.hpp> + +typedef datasketches::theta_sketch_alloc<palloc_allocator<void>> theta_sketch_pg; +typedef datasketches::update_theta_sketch_alloc<palloc_allocator<void>> update_theta_sketch_pg; +typedef datasketches::compact_theta_sketch_alloc<palloc_allocator<void>> compact_theta_sketch_pg; +typedef datasketches::theta_union_alloc<palloc_allocator<void>> theta_union_pg; + +void* theta_sketch_new_default() { + try { + return new (palloc(sizeof(update_theta_sketch_pg))) update_theta_sketch_pg(update_theta_sketch_pg::builder().build()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* theta_sketch_new(unsigned lg_k) { + try { + return new (palloc(sizeof(update_theta_sketch_pg))) update_theta_sketch_pg(update_theta_sketch_pg::builder().set_lg_k(lg_k).build()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void theta_sketch_delete(void* sketchptr) { + try { + static_cast<theta_sketch_pg*>(sketchptr)->~theta_sketch_pg(); + pfree(sketchptr); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void theta_sketch_update(void* sketchptr, const void* data, unsigned length) { + try { + static_cast<update_theta_sketch_pg*>(sketchptr)->update(data, length); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* theta_sketch_compact(void* sketchptr) { + try { + auto newptr = new (palloc(sizeof(compact_theta_sketch_pg))) compact_theta_sketch_pg(static_cast<update_theta_sketch_pg*>(sketchptr)->compact()); + static_cast<update_theta_sketch_pg*>(sketchptr)->~update_theta_sketch_pg(); + pfree(sketchptr); + return newptr; + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +double theta_sketch_get_estimate(const void* sketchptr) { + try { + return static_cast<const theta_sketch_pg*>(sketchptr)->get_estimate(); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void theta_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) { + try { + std::stringstream s; + static_cast<const theta_sketch_pg*>(sketchptr)->to_stream(s); + snprintf(buffer, length, s.str().c_str()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* theta_sketch_serialize(const void* sketchptr) { + try { + auto data = static_cast<const theta_sketch_pg*>(sketchptr)->serialize(VARHDRSZ); + bytea* buffer = (bytea*) data.first.release(); + const size_t length = data.second; + SET_VARSIZE(buffer, length); + return buffer; + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* theta_sketch_deserialize(const char* buffer, unsigned length) { + try { + auto ptr = theta_sketch_pg::deserialize(buffer, length); + return ptr.release(); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* theta_union_new_default() { + try { + return new (palloc(sizeof(theta_union_pg))) theta_union_pg(theta_union_pg::builder().build()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* theta_union_new(unsigned lg_k) { + try { + return new (palloc(sizeof(theta_union_pg))) theta_union_pg(theta_union_pg::builder().set_lg_k(lg_k).build()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void theta_union_delete(void* unionptr) { + try { + static_cast<theta_union_pg*>(unionptr)->~theta_union_pg(); + pfree(unionptr); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void theta_union_update(void* unionptr, const void* sketchptr) { + try { + static_cast<theta_union_pg*>(unionptr)->update(*static_cast<const theta_sketch_pg*>(sketchptr)); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} + +void* theta_union_get_result(void* unionptr) { + try { + return new (palloc(sizeof(compact_theta_sketch_pg))) compact_theta_sketch_pg(static_cast<theta_union_pg*>(unionptr)->get_result()); + } catch (std::exception& e) { + elog(ERROR, e.what()); + } +} diff --git a/src/theta_sketch_c_adapter.h b/src/theta_sketch_c_adapter.h new file mode 100644 index 0000000..57c3df5 --- /dev/null +++ b/src/theta_sketch_c_adapter.h @@ -0,0 +1,36 @@ +/* + * Copyright 2019, Verizon Media. + * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. + */ + +#ifndef THETA_SKETCH_C_ADAPTER_H +#define THETA_SKETCH_C_ADAPTER_H + +#ifdef __cplusplus +extern "C" { +#endif + +void* theta_sketch_new_default(); +void* theta_sketch_new(unsigned lg_k); +void theta_sketch_delete(void* sketchptr); + +void theta_sketch_update(void* sketchptr, const void* data, unsigned length); +void* theta_sketch_compact(void* sketchptr); +void theta_sketch_union(void* sketchptr1, const void* sketchptr2); +double theta_sketch_get_estimate(const void* sketchptr); +void theta_sketch_to_string(const void* sketchptr, char* buffer, unsigned length); + +void* theta_sketch_serialize(const void* sketchptr); +void* theta_sketch_deserialize(const char* buffer, unsigned length); + +void* theta_union_new_default(); +void* theta_union_new(unsigned lg_k); +void theta_union_delete(void* unionptr); +void theta_union_update(void* unionptr, const void* sketchptr); +void* theta_union_get_result(void* unionptr); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/theta_sketch_pg_functions.c b/src/theta_sketch_pg_functions.c new file mode 100644 index 0000000..2c19264 --- /dev/null +++ b/src/theta_sketch_pg_functions.c @@ -0,0 +1,215 @@ +/* + * Copyright 2019, Verizon Media. + * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. + */ + +#include <postgres.h> +#include <fmgr.h> +#include <utils/lsyscache.h> +#include <utils/builtins.h> + +#include "theta_sketch_c_adapter.h" +#include "base64.h" + +/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */ +PG_FUNCTION_INFO_V1(pg_theta_sketch_add_item); +PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate); +PG_FUNCTION_INFO_V1(pg_theta_sketch_to_string); +PG_FUNCTION_INFO_V1(pg_theta_sketch_merge); +PG_FUNCTION_INFO_V1(pg_theta_sketch_from_internal); +PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate_from_internal); +PG_FUNCTION_INFO_V1(pg_theta_union_get_result); + +/* function declarations */ +Datum pg_theta_sketch_recv(PG_FUNCTION_ARGS); +Datum pg_theta_sketch_send(PG_FUNCTION_ARGS); +Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS); +Datum pg_theta_sketch_get_estimate(PG_FUNCTION_ARGS); +Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS); +Datum pg_theta_sketch_union(PG_FUNCTION_ARGS); +Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS); +Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS); +Datum pg_theta_union_get_result(PG_FUNCTION_ARGS); + +Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS) { + void* sketchptr; + int lg_k; + + // anyelement + Oid element_type; + Datum element; + int16 typlen; + bool typbyval; + char typalign; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) { + PG_RETURN_NULL(); + } else if (PG_ARGISNULL(1)) { + PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state + } + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "theta_sketch_add_item called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + if (PG_ARGISNULL(0)) { + lg_k = PG_GETARG_INT32(2); + sketchptr = lg_k ? theta_sketch_new(lg_k) : theta_sketch_new_default(); + } else { + sketchptr = PG_GETARG_POINTER(0); + } + + element_type = get_fn_expr_argtype(fcinfo->flinfo, 1); + element = PG_GETARG_DATUM(1); + get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign); + if (typlen == -1) { + // varlena + theta_sketch_update(sketchptr, VARDATA_ANY(element), VARSIZE_ANY_EXHDR(element)); + } else if (typbyval) { + // fixed-length passed by value + theta_sketch_update(sketchptr, &element, typlen); + } else { + // fixed-length passed by reference + theta_sketch_update(sketchptr, (void*)element, typlen); + } + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_POINTER(sketchptr); +} + +Datum pg_theta_sketch_get_estimate(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + double estimate; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = theta_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + estimate = theta_sketch_get_estimate(sketchptr); + theta_sketch_delete(sketchptr); + PG_RETURN_FLOAT8(estimate); +} + +Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS) { + const bytea* bytes_in; + void* sketchptr; + char str[1024]; + bytes_in = PG_GETARG_BYTEA_P(0); + sketchptr = theta_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ); + theta_sketch_to_string(sketchptr, str, 1024); + theta_sketch_delete(sketchptr); + PG_RETURN_TEXT_P(cstring_to_text(str)); +} + +Datum pg_theta_sketch_union(PG_FUNCTION_ARGS) { + void* unionptr; + bytea* sketch_bytes; + void* sketchptr; + int lg_k; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) { + PG_RETURN_NULL(); + } else if (PG_ARGISNULL(1)) { + PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state + } + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "theta_sketch_merge called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + if (PG_ARGISNULL(0)) { + lg_k = PG_GETARG_INT32(2); + unionptr = lg_k ? theta_union_new(lg_k) : theta_union_new_default(); + } else { + unionptr = PG_GETARG_POINTER(0); + } + + sketch_bytes = PG_GETARG_BYTEA_P(1); + sketchptr = theta_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ); + theta_union_update(unionptr, sketchptr); + theta_sketch_delete(sketchptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_POINTER(unionptr); +} + +Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS) { + void* sketchptr; + bytea* bytes_out; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0)) PG_RETURN_NULL(); + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "theta_sketch_from_internal called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + sketchptr = PG_GETARG_POINTER(0); + sketchptr = theta_sketch_compact(sketchptr); + bytes_out = theta_sketch_serialize(sketchptr); + theta_sketch_delete(sketchptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_BYTEA_P(bytes_out); +} + +Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) { + void* sketchptr; + double estimate; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0)) PG_RETURN_NULL(); + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "theta_sketch_from_internal called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + sketchptr = PG_GETARG_POINTER(0); + estimate = theta_sketch_get_estimate(sketchptr); + theta_sketch_delete(sketchptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_FLOAT8(estimate); +} + +Datum pg_theta_union_get_result(PG_FUNCTION_ARGS) { + void* unionptr; + void* sketchptr; + bytea* bytes_out; + + MemoryContext oldcontext; + MemoryContext aggcontext; + + if (PG_ARGISNULL(0)) PG_RETURN_NULL(); + + if (!AggCheckCallContext(fcinfo, &aggcontext)) { + elog(ERROR, "theta_union_get_result called in non-aggregate context"); + } + oldcontext = MemoryContextSwitchTo(aggcontext); + + unionptr = PG_GETARG_POINTER(0); + sketchptr = theta_union_get_result(unionptr); + bytes_out = theta_sketch_serialize(sketchptr); + theta_sketch_delete(sketchptr); + theta_union_delete(unionptr); + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_BYTEA_P(bytes_out); +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
