This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-bigquery.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fb7dd5  cpc sketch
3fb7dd5 is described below

commit 3fb7dd5c551d75b3fecba7152cddd4cada31bbe0
Author: AlexanderSaydakov <[email protected]>
AuthorDate: Mon Aug 5 14:20:22 2024 -0700

    cpc sketch
---
 Makefile                                |   3 +-
 cpc_sketch.cpp                          |  92 ++++++++++++++++++++++++
 cpc_sketch_agg_string.sql               | 121 ++++++++++++++++++++++++++++++++
 Makefile => cpc_sketch_get_estimate.sql |  41 ++++-------
 4 files changed, 229 insertions(+), 28 deletions(-)

diff --git a/Makefile b/Makefile
index a5083a4..bd6f65f 100644
--- a/Makefile
+++ b/Makefile
@@ -18,6 +18,7 @@
 EMCC=emcc
 EMCFLAGS=-Idatasketches-cpp/common/include \
        -Idatasketches-cpp/theta/include \
+       -Idatasketches-cpp/cpc/include \
        --no-entry \
        -sWASM_BIGINT=1 \
        -sEXPORTED_FUNCTIONS=[_malloc,_free] \
@@ -26,7 +27,7 @@ EMCFLAGS=-Idatasketches-cpp/common/include \
        -O3 \
        --bind
 
-all: theta_sketch.mjs theta_sketch.js theta_sketch.wasm
+all: theta_sketch.mjs theta_sketch.js theta_sketch.wasm cpc_sketch.mjs 
cpc_sketch.js cpc_sketch.wasm
 
 %.mjs: %.cpp
        $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@
diff --git a/cpc_sketch.cpp b/cpc_sketch.cpp
new file mode 100644
index 0000000..536dd42
--- /dev/null
+++ b/cpc_sketch.cpp
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <strstream>
+#include <emscripten/bind.h>
+
+#include <cpc_sketch.hpp>
+#include <cpc_union.hpp>
+
+#include "base64.hpp"
+
+const emscripten::val Uint8Array = emscripten::val::global("Uint8Array");
+
+EMSCRIPTEN_BINDINGS(cpc_sketch) {
+
+  emscripten::function("getExceptionMessage", 
emscripten::optional_override([](intptr_t ptr) {
+    return std::string(reinterpret_cast<std::exception*>(ptr)->what());
+  }));
+
+  emscripten::class_<datasketches::cpc_sketch>("cpc_sketch")
+    .constructor(emscripten::optional_override([](uint8_t lg_k, uint64_t seed) 
{
+      return new datasketches::cpc_sketch(lg_k, seed);
+    }))
+    .function("updateString", emscripten::select_overload<void(const 
std::string&)>(&datasketches::cpc_sketch::update))
+    .function("serializeAsUint8Array", emscripten::optional_override([](const 
datasketches::cpc_sketch& self) {
+      auto bytes = self.serialize();
+      return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(), 
bytes.data()));
+    }))
+    .class_function("deserializeFromB64", 
emscripten::optional_override([](const std::string& b64, uint64_t seed) {
+      std::vector<char> bytes(b64_dec_len(b64.data(), b64.size()));
+      b64_decode(b64.data(), b64.size(), bytes.data());
+      return new 
datasketches::cpc_sketch(datasketches::cpc_sketch::deserialize(bytes.data(), 
bytes.size(), seed));
+    }), emscripten::allow_raw_pointers())
+    .class_function("deserializeFromBytes", 
emscripten::optional_override([](const std::string& bytes, uint64_t seed) {
+      return new 
datasketches::cpc_sketch(datasketches::cpc_sketch::deserialize(bytes.data(), 
bytes.size(), seed));
+    }), emscripten::allow_raw_pointers())
+    .function("getEstimate", &datasketches::cpc_sketch::get_estimate)
+    .function("getLowerBound", &datasketches::cpc_sketch::get_lower_bound)
+    .function("getUpperBound", &datasketches::cpc_sketch::get_upper_bound)
+    .function("toString", &datasketches::cpc_sketch::to_string)
+    .class_function("getMaxSerializedSizeBytes", 
&datasketches::cpc_sketch::get_max_serialized_size_bytes)
+    ;
+
+  emscripten::class_<datasketches::cpc_union>("cpc_union")
+    .constructor(emscripten::optional_override([](uint8_t lg_k, uint64_t seed) 
{
+      return new datasketches::cpc_union(lg_k, seed);
+    }))
+    .function("updateWithBytes", 
emscripten::optional_override([](datasketches::cpc_union& self, const 
std::string& bytes, uint64_t seed) {
+      self.update(datasketches::cpc_sketch::deserialize(bytes.data(), 
bytes.size(), seed));
+    }), emscripten::allow_raw_pointers())
+    .function("updateWithB64", 
emscripten::optional_override([](datasketches::cpc_union& self, const 
std::string& b64, uint64_t seed) {
+      std::vector<char> bytes(b64_dec_len(b64.data(), b64.size()));
+      b64_decode(b64.data(), b64.size(), bytes.data());
+      self.update(datasketches::cpc_sketch::deserialize(bytes.data(), 
bytes.size(), seed));
+    }), emscripten::allow_raw_pointers())
+    .function("updateWithBuffer", 
emscripten::optional_override([](datasketches::cpc_union& self, intptr_t bytes, 
size_t size, uint64_t seed) {
+      
self.update(datasketches::cpc_sketch::deserialize(reinterpret_cast<void*>(bytes),
 size, seed));
+    }))
+    .function("getResultStream", 
emscripten::optional_override([](datasketches::cpc_union& self, intptr_t bytes, 
size_t size) {
+      std::strstream stream(reinterpret_cast<char*>(bytes), size);
+      self.get_result().serialize(stream);
+      return (int) stream.tellp();
+    }))
+    .function("getResultAsUint8Array", 
emscripten::optional_override([](datasketches::cpc_union& self) {
+      auto bytes = self.get_result().serialize();
+      return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(), 
bytes.data()));
+    }))
+    .function("getResultB64", 
emscripten::optional_override([](datasketches::cpc_union& self) {
+      auto bytes = self.get_result().serialize();
+      std::vector<char> b64(b64_enc_len(bytes.size()));
+      b64_encode((const char*) bytes.data(), bytes.size(), b64.data());
+      return std::string(b64.data(), b64.size());
+    }))
+    ;
+
+}
diff --git a/cpc_sketch_agg_string.sql b/cpc_sketch_agg_string.sql
new file mode 100644
index 0000000..8352c36
--- /dev/null
+++ b/cpc_sketch_agg_string.sql
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CREATE OR REPLACE AGGREGATE FUNCTION 
`$BQ_PROJECT.$BQ_DATASET`.cpc_sketch_agg_string`(str STRING, params STRUCT<lg_k 
INT, seed INT64> NOT AGGREGATE) RETURNS BYTES LANGUAGE js
+OPTIONS (library=["gs://$GCS_BUCKET/cpc_sketch.mjs"]) AS R"""
+import ModuleFactory from "gs://$GCS_BUCKET/cpc_sketch.mjs";
+var Module = await ModuleFactory();
+const default_lg_k = Number(12);
+const default_seed = BigInt(9001);
+
+function destroyState(state) {
+  if (state.sketch) {
+    state.sketch.delete();
+    state.sketch = null;
+  }
+  if (state.union) {
+    state.union.delete();
+    state.union = null;
+  }
+  state.serialized = null;
+}
+
+// UDAF interface
+export function initialState(params) {
+  var state = {
+    lg_k: params.lg_k == null ? default_lg_k : Number(params.lg_k),
+    seed: params.seed == null ? default_seed : BigInt(params.seed),
+    sketch: null,
+    union: null,
+    serialized: null
+  };
+  state.sketch = new Module.cpc_sketch(state.lg_k, state.seed);
+  return state;
+}
+
+export function aggregate(state, str) {
+  if (state.sketch == null) {
+    state.sketch = new Module.cpc_sketch(state.lg_k, state.seed);
+  }
+  state.sketch.updateString(str);
+}
+
+export function serialize(state) {
+  try {
+    if (state.sketch != null && state.serialized != null) {
+      // merge aggregated and serialized state
+      var u = new Module.cpc_union(state.lg_k, state.seed);
+      try {
+        u.updateWithSketch(state.sketch);
+        u.updateWithBytes(state.serialized, state.seed);
+        state.serialized = u.getResultAsUint8ArrayCompressed();
+      } finally {
+        u.delete();
+      }
+    } else if (state.sketch != null) {
+      state.serialized = state.sketch.serializeAsUint8Array();
+    } else if (state.union != null) {
+      state.serialized = state.union.getResultAsUint8Array();
+    } else if (state.serialized == null) {
+      throw new Error("Unexpected state in serialization " + 
JSON.stringify(state));
+    }
+    return {
+      lg_k: state.lg_k,
+      seed: state.seed,
+      bytes: state.serialized
+    };
+  } finally {
+    destroyState(state);
+  }
+}
+
+export function deserialize(serialized) {
+  return {
+    sketch: null,
+    union: null,
+    serialized: serialized.bytes,
+    lg_k: serialized.lg_k,
+    seed: serialized.seed
+  };
+}
+
+export function merge(state, other_state) {
+  if (!state.union) {
+    state.union = new Module.cpc_union(state.lg_k, state.seed);
+  }
+  if (state.sketch || other_state.sketch) {
+    throw new Error("sketch is not expected in merge");
+  }
+  if (other_state.union) {
+    throw new Error("other_state should not have union in merge");
+  }
+  if (state.serialized) {
+    state.union.updateWithBytes(state.serialized, state.seed);
+    state.serialized = null;
+  }
+  if (other_state.serialized) {
+    state.union.updateWithBytes(other_state.serialized, other_state.seed);
+    other_state.serialized = null;
+  } else {
+    throw new Error("other_state should have serialized sketch in merge");
+  }
+}
+
+export function finalize(state) {
+  return serialize(state).bytes
+}
+""";
diff --git a/Makefile b/cpc_sketch_get_estimate.sql
similarity index 58%
copy from Makefile
copy to cpc_sketch_get_estimate.sql
index a5083a4..315de62 100644
--- a/Makefile
+++ b/cpc_sketch_get_estimate.sql
@@ -15,30 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 
-EMCC=emcc
-EMCFLAGS=-Idatasketches-cpp/common/include \
-       -Idatasketches-cpp/theta/include \
-       --no-entry \
-       -sWASM_BIGINT=1 \
-       -sEXPORTED_FUNCTIONS=[_malloc,_free] \
-       -sENVIRONMENT=shell \
-       -sTOTAL_MEMORY=1024MB \
-       -O3 \
-       --bind
-
-all: theta_sketch.mjs theta_sketch.js theta_sketch.wasm
-
-%.mjs: %.cpp
-       $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@
-
-# this rule creates a non-es6 loadable library
-%.js: %.cpp
-       $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@
-
-%.wasm: %.cpp
-       $(EMCC) $< $(EMCFLAGS) -sSTANDALONE_WASM=1 -o $@
-
-clean:
-       $(RM) *.mjs *.js *.wasm
-
-.PHONY: clean
+CREATE OR REPLACE FUNCTION 
`$BQ_PROJECT.$BQ_DATASET`.cpc_sketch_get_estimate(base64 BYTES, seed INT64) 
RETURNS FLOAT64 LANGUAGE js
+OPTIONS (library=["gs://$GCS_BUCKET/cpc_sketch.js"]) AS R"""
+const default_seed = BigInt(9001);
+try {
+  var sketch = Module.cpc_sketch.deserializeFromB64(base64, seed ? 
BigInt(seed) : default_seed);
+  try {
+    return sketch.getEstimate();
+  } finally {
+    sketch.delete();
+  }
+} catch (e) {
+  throw new Error(Module.getExceptionMessage(e));
+}
+""";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to