This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch theta
in repository
https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git
The following commit(s) were added to refs/heads/theta by this push:
new 161af07 non-aggredate union and intersection
161af07 is described below
commit 161af074f0457ffd14f42339b0d31a6bb23294c0
Author: AlexanderSaydakov <[email protected]>
AuthorDate: Fri Jun 7 18:04:46 2019 -0700
non-aggredate union and intersection
---
sql/datasketches_theta_sketch.sql | 20 ++++++++----
src/theta_sketch_c_adapter.cpp | 35 ++++++++++++++++++++
src/theta_sketch_c_adapter.h | 5 +++
src/theta_sketch_pg_functions.c | 68 +++++++++++++++++++++++++++++++++++++--
4 files changed, 119 insertions(+), 9 deletions(-)
diff --git a/sql/datasketches_theta_sketch.sql
b/sql/datasketches_theta_sketch.sql
index f5200ff..c060c54 100644
--- a/sql/datasketches_theta_sketch.sql
+++ b/sql/datasketches_theta_sketch.sql
@@ -44,12 +44,12 @@ CREATE OR REPLACE FUNCTION
theta_sketch_to_string(theta_sketch) RETURNS TEXT
AS '$libdir/datasketches', 'pg_theta_sketch_to_string'
LANGUAGE C STRICT IMMUTABLE;
-CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch) RETURNS
internal
- AS '$libdir/datasketches', 'pg_theta_sketch_union'
+CREATE OR REPLACE FUNCTION theta_sketch_union_agg(internal, theta_sketch)
RETURNS internal
+ AS '$libdir/datasketches', 'pg_theta_sketch_union_agg'
LANGUAGE C IMMUTABLE;
-CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch, int)
RETURNS internal
- AS '$libdir/datasketches', 'pg_theta_sketch_union'
+CREATE OR REPLACE FUNCTION theta_sketch_union_agg(internal, theta_sketch, int)
RETURNS internal
+ AS '$libdir/datasketches', 'pg_theta_sketch_union_agg'
LANGUAGE C IMMUTABLE;
CREATE OR REPLACE FUNCTION theta_union_get_result(internal) RETURNS
theta_sketch
@@ -81,13 +81,21 @@ CREATE AGGREGATE theta_sketch_build(anyelement, int) (
);
CREATE AGGREGATE theta_sketch_union(theta_sketch) (
- sfunc = theta_sketch_union,
+ sfunc = theta_sketch_union_agg,
stype = internal,
finalfunc = theta_union_get_result
);
CREATE AGGREGATE theta_sketch_union(theta_sketch, int) (
- sfunc = theta_sketch_union,
+ sfunc = theta_sketch_union_agg,
stype = internal,
finalfunc = theta_union_get_result
);
+
+CREATE OR REPLACE FUNCTION theta_sketch_union(theta_sketch, theta_sketch)
RETURNS theta_sketch
+ AS '$libdir/datasketches', 'pg_theta_sketch_union'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_intersection(theta_sketch,
theta_sketch) RETURNS theta_sketch
+ AS '$libdir/datasketches', 'pg_theta_sketch_intersection'
+ LANGUAGE C STRICT IMMUTABLE;
diff --git a/src/theta_sketch_c_adapter.cpp b/src/theta_sketch_c_adapter.cpp
index 968204c..b1bd47b 100644
--- a/src/theta_sketch_c_adapter.cpp
+++ b/src/theta_sketch_c_adapter.cpp
@@ -14,11 +14,13 @@ extern "C" {
#include <theta_sketch.hpp>
#include <theta_union.hpp>
+#include <theta_intersection.hpp>
typedef datasketches::theta_sketch_alloc<palloc_allocator<void>>
theta_sketch_pg;
typedef datasketches::update_theta_sketch_alloc<palloc_allocator<void>>
update_theta_sketch_pg;
typedef datasketches::compact_theta_sketch_alloc<palloc_allocator<void>>
compact_theta_sketch_pg;
typedef datasketches::theta_union_alloc<palloc_allocator<void>> theta_union_pg;
+typedef datasketches::theta_intersection_alloc<palloc_allocator<void>>
theta_intersection_pg;
void* theta_sketch_new_default() {
try {
@@ -143,3 +145,36 @@ void* theta_union_get_result(void* unionptr) {
elog(ERROR, e.what());
}
}
+
+void* theta_intersection_new_default() {
+ try {
+ return new (palloc(sizeof(theta_intersection_pg))) theta_intersection_pg;
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void theta_intersection_delete(void* interptr) {
+ try {
+ static_cast<theta_intersection_pg*>(interptr)->~theta_intersection_pg();
+ pfree(interptr);
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void theta_intersection_update(void* interptr, const void* sketchptr) {
+ try {
+ static_cast<theta_intersection_pg*>(interptr)->update(*static_cast<const
theta_sketch_pg*>(sketchptr));
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_intersection_get_result(void* interptr) {
+ try {
+ return new (palloc(sizeof(compact_theta_sketch_pg)))
compact_theta_sketch_pg(static_cast<theta_intersection_pg*>(interptr)->get_result());
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
diff --git a/src/theta_sketch_c_adapter.h b/src/theta_sketch_c_adapter.h
index 57c3df5..2cf9dc9 100644
--- a/src/theta_sketch_c_adapter.h
+++ b/src/theta_sketch_c_adapter.h
@@ -29,6 +29,11 @@ void theta_union_delete(void* unionptr);
void theta_union_update(void* unionptr, const void* sketchptr);
void* theta_union_get_result(void* unionptr);
+void* theta_intersection_new_default();
+void theta_intersection_delete(void* interptr);
+void theta_intersection_update(void* interptr, const void* sketchptr);
+void* theta_intersection_get_result(void* interptr);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/theta_sketch_pg_functions.c b/src/theta_sketch_pg_functions.c
index 52747ab..7de5107 100644
--- a/src/theta_sketch_pg_functions.c
+++ b/src/theta_sketch_pg_functions.c
@@ -15,10 +15,12 @@
PG_FUNCTION_INFO_V1(pg_theta_sketch_add_item);
PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate);
PG_FUNCTION_INFO_V1(pg_theta_sketch_to_string);
-PG_FUNCTION_INFO_V1(pg_theta_sketch_union);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_union_agg);
PG_FUNCTION_INFO_V1(pg_theta_sketch_from_internal);
PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate_from_internal);
PG_FUNCTION_INFO_V1(pg_theta_union_get_result);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_union);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_intersection);
/* function declarations */
Datum pg_theta_sketch_recv(PG_FUNCTION_ARGS);
@@ -26,10 +28,12 @@ Datum pg_theta_sketch_send(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_get_estimate(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS);
-Datum pg_theta_sketch_union(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_union_agg(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS);
Datum pg_theta_union_get_result(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_union(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_intersection(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS) {
void* sketchptr;
@@ -104,7 +108,7 @@ Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS) {
PG_RETURN_TEXT_P(cstring_to_text(str));
}
-Datum pg_theta_sketch_union(PG_FUNCTION_ARGS) {
+Datum pg_theta_sketch_union_agg(PG_FUNCTION_ARGS) {
void* unionptr;
bytea* sketch_bytes;
void* sketchptr;
@@ -213,3 +217,61 @@ Datum pg_theta_union_get_result(PG_FUNCTION_ARGS) {
PG_RETURN_BYTEA_P(bytes_out);
}
+
+Datum pg_theta_sketch_union(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in1;
+ const bytea* bytes_in2;
+ void* sketchptr1;
+ void* sketchptr2;
+ void* unionptr;
+ void* sketchptr;
+ bytea* bytes_out;
+
+ unionptr = theta_union_new_default();
+ if (!PG_ARGISNULL(0)) {
+ bytes_in1 = PG_GETARG_BYTEA_P(0);
+ sketchptr1 = theta_sketch_deserialize(VARDATA(bytes_in1),
VARSIZE(bytes_in1) - VARHDRSZ);
+ theta_union_update(unionptr, sketchptr1);
+ theta_sketch_delete(sketchptr1);
+ }
+ if (!PG_ARGISNULL(1)) {
+ bytes_in2 = PG_GETARG_BYTEA_P(1);
+ sketchptr2 = theta_sketch_deserialize(VARDATA(bytes_in2),
VARSIZE(bytes_in2) - VARHDRSZ);
+ theta_union_update(unionptr, sketchptr2);
+ theta_sketch_delete(sketchptr2);
+ }
+ sketchptr = theta_union_get_result(unionptr);
+ theta_union_delete(unionptr);
+ bytes_out = theta_sketch_serialize(sketchptr);
+ theta_sketch_delete(sketchptr);
+ PG_RETURN_BYTEA_P(bytes_out);
+}
+
+Datum pg_theta_sketch_intersection(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in1;
+ const bytea* bytes_in2;
+ void* sketchptr1;
+ void* sketchptr2;
+ void* interptr;
+ void* sketchptr;
+ bytea* bytes_out;
+
+ interptr = theta_intersection_new_default();
+ if (!PG_ARGISNULL(0)) {
+ bytes_in1 = PG_GETARG_BYTEA_P(0);
+ sketchptr1 = theta_sketch_deserialize(VARDATA(bytes_in1),
VARSIZE(bytes_in1) - VARHDRSZ);
+ theta_intersection_update(interptr, sketchptr1);
+ theta_sketch_delete(sketchptr1);
+ }
+ if (!PG_ARGISNULL(1)) {
+ bytes_in2 = PG_GETARG_BYTEA_P(1);
+ sketchptr2 = theta_sketch_deserialize(VARDATA(bytes_in2),
VARSIZE(bytes_in2) - VARHDRSZ);
+ theta_intersection_update(interptr, sketchptr2);
+ theta_sketch_delete(sketchptr2);
+ }
+ sketchptr = theta_intersection_get_result(interptr);
+ theta_intersection_delete(interptr);
+ bytes_out = theta_sketch_serialize(sketchptr);
+ theta_sketch_delete(sketchptr);
+ PG_RETURN_BYTEA_P(bytes_out);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]