This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-bigquery.git
The following commit(s) were added to refs/heads/main by this push:
new 3fb7dd5 cpc sketch
3fb7dd5 is described below
commit 3fb7dd5c551d75b3fecba7152cddd4cada31bbe0
Author: AlexanderSaydakov <[email protected]>
AuthorDate: Mon Aug 5 14:20:22 2024 -0700
cpc sketch
---
Makefile | 3 +-
cpc_sketch.cpp | 92 ++++++++++++++++++++++++
cpc_sketch_agg_string.sql | 121 ++++++++++++++++++++++++++++++++
Makefile => cpc_sketch_get_estimate.sql | 41 ++++-------
4 files changed, 229 insertions(+), 28 deletions(-)
diff --git a/Makefile b/Makefile
index a5083a4..bd6f65f 100644
--- a/Makefile
+++ b/Makefile
@@ -18,6 +18,7 @@
EMCC=emcc
EMCFLAGS=-Idatasketches-cpp/common/include \
-Idatasketches-cpp/theta/include \
+ -Idatasketches-cpp/cpc/include \
--no-entry \
-sWASM_BIGINT=1 \
-sEXPORTED_FUNCTIONS=[_malloc,_free] \
@@ -26,7 +27,7 @@ EMCFLAGS=-Idatasketches-cpp/common/include \
-O3 \
--bind
-all: theta_sketch.mjs theta_sketch.js theta_sketch.wasm
+all: theta_sketch.mjs theta_sketch.js theta_sketch.wasm cpc_sketch.mjs
cpc_sketch.js cpc_sketch.wasm
%.mjs: %.cpp
$(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@
diff --git a/cpc_sketch.cpp b/cpc_sketch.cpp
new file mode 100644
index 0000000..536dd42
--- /dev/null
+++ b/cpc_sketch.cpp
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <strstream>
+#include <emscripten/bind.h>
+
+#include <cpc_sketch.hpp>
+#include <cpc_union.hpp>
+
+#include "base64.hpp"
+
+const emscripten::val Uint8Array = emscripten::val::global("Uint8Array");
+
+EMSCRIPTEN_BINDINGS(cpc_sketch) {
+
+ emscripten::function("getExceptionMessage",
emscripten::optional_override([](intptr_t ptr) {
+ return std::string(reinterpret_cast<std::exception*>(ptr)->what());
+ }));
+
+ emscripten::class_<datasketches::cpc_sketch>("cpc_sketch")
+ .constructor(emscripten::optional_override([](uint8_t lg_k, uint64_t seed)
{
+ return new datasketches::cpc_sketch(lg_k, seed);
+ }))
+ .function("updateString", emscripten::select_overload<void(const
std::string&)>(&datasketches::cpc_sketch::update))
+ .function("serializeAsUint8Array", emscripten::optional_override([](const
datasketches::cpc_sketch& self) {
+ auto bytes = self.serialize();
+ return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(),
bytes.data()));
+ }))
+ .class_function("deserializeFromB64",
emscripten::optional_override([](const std::string& b64, uint64_t seed) {
+ std::vector<char> bytes(b64_dec_len(b64.data(), b64.size()));
+ b64_decode(b64.data(), b64.size(), bytes.data());
+ return new
datasketches::cpc_sketch(datasketches::cpc_sketch::deserialize(bytes.data(),
bytes.size(), seed));
+ }), emscripten::allow_raw_pointers())
+ .class_function("deserializeFromBytes",
emscripten::optional_override([](const std::string& bytes, uint64_t seed) {
+ return new
datasketches::cpc_sketch(datasketches::cpc_sketch::deserialize(bytes.data(),
bytes.size(), seed));
+ }), emscripten::allow_raw_pointers())
+ .function("getEstimate", &datasketches::cpc_sketch::get_estimate)
+ .function("getLowerBound", &datasketches::cpc_sketch::get_lower_bound)
+ .function("getUpperBound", &datasketches::cpc_sketch::get_upper_bound)
+ .function("toString", &datasketches::cpc_sketch::to_string)
+ .class_function("getMaxSerializedSizeBytes",
&datasketches::cpc_sketch::get_max_serialized_size_bytes)
+ ;
+
+ emscripten::class_<datasketches::cpc_union>("cpc_union")
+ .constructor(emscripten::optional_override([](uint8_t lg_k, uint64_t seed)
{
+ return new datasketches::cpc_union(lg_k, seed);
+ }))
+ .function("updateWithBytes",
emscripten::optional_override([](datasketches::cpc_union& self, const
std::string& bytes, uint64_t seed) {
+ self.update(datasketches::cpc_sketch::deserialize(bytes.data(),
bytes.size(), seed));
+ }), emscripten::allow_raw_pointers())
+ .function("updateWithB64",
emscripten::optional_override([](datasketches::cpc_union& self, const
std::string& b64, uint64_t seed) {
+ std::vector<char> bytes(b64_dec_len(b64.data(), b64.size()));
+ b64_decode(b64.data(), b64.size(), bytes.data());
+ self.update(datasketches::cpc_sketch::deserialize(bytes.data(),
bytes.size(), seed));
+ }), emscripten::allow_raw_pointers())
+ .function("updateWithBuffer",
emscripten::optional_override([](datasketches::cpc_union& self, intptr_t bytes,
size_t size, uint64_t seed) {
+
self.update(datasketches::cpc_sketch::deserialize(reinterpret_cast<void*>(bytes),
size, seed));
+ }))
+ .function("getResultStream",
emscripten::optional_override([](datasketches::cpc_union& self, intptr_t bytes,
size_t size) {
+ std::strstream stream(reinterpret_cast<char*>(bytes), size);
+ self.get_result().serialize(stream);
+ return (int) stream.tellp();
+ }))
+ .function("getResultAsUint8Array",
emscripten::optional_override([](datasketches::cpc_union& self) {
+ auto bytes = self.get_result().serialize();
+ return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(),
bytes.data()));
+ }))
+ .function("getResultB64",
emscripten::optional_override([](datasketches::cpc_union& self) {
+ auto bytes = self.get_result().serialize();
+ std::vector<char> b64(b64_enc_len(bytes.size()));
+ b64_encode((const char*) bytes.data(), bytes.size(), b64.data());
+ return std::string(b64.data(), b64.size());
+ }))
+ ;
+
+}
diff --git a/cpc_sketch_agg_string.sql b/cpc_sketch_agg_string.sql
new file mode 100644
index 0000000..8352c36
--- /dev/null
+++ b/cpc_sketch_agg_string.sql
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CREATE OR REPLACE AGGREGATE FUNCTION
`$BQ_PROJECT.$BQ_DATASET`.cpc_sketch_agg_string`(str STRING, params STRUCT<lg_k
INT, seed INT64> NOT AGGREGATE) RETURNS BYTES LANGUAGE js
+OPTIONS (library=["gs://$GCS_BUCKET/cpc_sketch.mjs"]) AS R"""
+import ModuleFactory from "gs://$GCS_BUCKET/cpc_sketch.mjs";
+var Module = await ModuleFactory();
+const default_lg_k = Number(12);
+const default_seed = BigInt(9001);
+
+function destroyState(state) {
+ if (state.sketch) {
+ state.sketch.delete();
+ state.sketch = null;
+ }
+ if (state.union) {
+ state.union.delete();
+ state.union = null;
+ }
+ state.serialized = null;
+}
+
+// UDAF interface
+export function initialState(params) {
+ var state = {
+ lg_k: params.lg_k == null ? default_lg_k : Number(params.lg_k),
+ seed: params.seed == null ? default_seed : BigInt(params.seed),
+ sketch: null,
+ union: null,
+ serialized: null
+ };
+ state.sketch = new Module.cpc_sketch(state.lg_k, state.seed);
+ return state;
+}
+
+export function aggregate(state, str) {
+ if (state.sketch == null) {
+ state.sketch = new Module.cpc_sketch(state.lg_k, state.seed);
+ }
+ state.sketch.updateString(str);
+}
+
+export function serialize(state) {
+ try {
+ if (state.sketch != null && state.serialized != null) {
+ // merge aggregated and serialized state
+ var u = new Module.cpc_union(state.lg_k, state.seed);
+ try {
+ u.updateWithSketch(state.sketch);
+ u.updateWithBytes(state.serialized, state.seed);
+ state.serialized = u.getResultAsUint8ArrayCompressed();
+ } finally {
+ u.delete();
+ }
+ } else if (state.sketch != null) {
+ state.serialized = state.sketch.serializeAsUint8Array();
+ } else if (state.union != null) {
+ state.serialized = state.union.getResultAsUint8Array();
+ } else if (state.serialized == null) {
+ throw new Error("Unexpected state in serialization " +
JSON.stringify(state));
+ }
+ return {
+ lg_k: state.lg_k,
+ seed: state.seed,
+ bytes: state.serialized
+ };
+ } finally {
+ destroyState(state);
+ }
+}
+
+export function deserialize(serialized) {
+ return {
+ sketch: null,
+ union: null,
+ serialized: serialized.bytes,
+ lg_k: serialized.lg_k,
+ seed: serialized.seed
+ };
+}
+
+export function merge(state, other_state) {
+ if (!state.union) {
+ state.union = new Module.cpc_union(state.lg_k, state.seed);
+ }
+ if (state.sketch || other_state.sketch) {
+ throw new Error("sketch is not expected in merge");
+ }
+ if (other_state.union) {
+ throw new Error("other_state should not have union in merge");
+ }
+ if (state.serialized) {
+ state.union.updateWithBytes(state.serialized, state.seed);
+ state.serialized = null;
+ }
+ if (other_state.serialized) {
+ state.union.updateWithBytes(other_state.serialized, other_state.seed);
+ other_state.serialized = null;
+ } else {
+ throw new Error("other_state should have serialized sketch in merge");
+ }
+}
+
+export function finalize(state) {
+ return serialize(state).bytes
+}
+""";
diff --git a/Makefile b/cpc_sketch_get_estimate.sql
similarity index 58%
copy from Makefile
copy to cpc_sketch_get_estimate.sql
index a5083a4..315de62 100644
--- a/Makefile
+++ b/cpc_sketch_get_estimate.sql
@@ -15,30 +15,17 @@
# specific language governing permissions and limitations
# under the License.
-EMCC=emcc
-EMCFLAGS=-Idatasketches-cpp/common/include \
- -Idatasketches-cpp/theta/include \
- --no-entry \
- -sWASM_BIGINT=1 \
- -sEXPORTED_FUNCTIONS=[_malloc,_free] \
- -sENVIRONMENT=shell \
- -sTOTAL_MEMORY=1024MB \
- -O3 \
- --bind
-
-all: theta_sketch.mjs theta_sketch.js theta_sketch.wasm
-
-%.mjs: %.cpp
- $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@
-
-# this rule creates a non-es6 loadable library
-%.js: %.cpp
- $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@
-
-%.wasm: %.cpp
- $(EMCC) $< $(EMCFLAGS) -sSTANDALONE_WASM=1 -o $@
-
-clean:
- $(RM) *.mjs *.js *.wasm
-
-.PHONY: clean
+CREATE OR REPLACE FUNCTION
`$BQ_PROJECT.$BQ_DATASET`.cpc_sketch_get_estimate(base64 BYTES, seed INT64)
RETURNS FLOAT64 LANGUAGE js
+OPTIONS (library=["gs://$GCS_BUCKET/cpc_sketch.js"]) AS R"""
+const default_seed = BigInt(9001);
+try {
+ var sketch = Module.cpc_sketch.deserializeFromB64(base64, seed ?
BigInt(seed) : default_seed);
+ try {
+ return sketch.getEstimate();
+ } finally {
+ sketch.delete();
+ }
+} catch (e) {
+ throw new Error(Module.getExceptionMessage(e));
+}
+""";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]