This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch fix_aggs in repository https://gitbox.apache.org/repos/asf/datasketches-bigquery.git
commit 425a87a789e78ffc0a0fa0487c1738a165fc12ff Author: AlexanderSaydakov <[email protected]> AuthorDate: Tue Sep 24 12:48:56 2024 -0700 fixed agg functions --- cpc/sqlx/cpc_sketch_agg_string_lgk_seed.sqlx | 81 ++++++++++------------ cpc/sqlx/cpc_sketch_agg_union_lgk_seed.sqlx | 33 ++++----- hll/sqlx/hll_sketch_agg_string_lgk_type.sqlx | 61 ++++++++-------- hll/sqlx/hll_sketch_agg_union_lgk_type.sqlx | 5 +- kll/Makefile | 1 + kll/sqlx/kll_sketch_float_build.sqlx | 4 ++ kll/sqlx/kll_sketch_float_merge.sqlx | 4 ++ theta/sqlx/theta_sketch_agg_string_lgk_seed_p.sqlx | 2 +- theta/sqlx/theta_sketch_agg_union_lgk_seed.sqlx | 56 +++++---------- ...ple_sketch_int64_agg_int64_lgk_seed_p_mode.sqlx | 15 ++-- ...le_sketch_int64_agg_string_lgk_seed_p_mode.sqlx | 15 ++-- ...tuple_sketch_int64_agg_union_lgk_seed_mode.sqlx | 4 ++ 12 files changed, 137 insertions(+), 144 deletions(-) diff --git a/cpc/sqlx/cpc_sketch_agg_string_lgk_seed.sqlx b/cpc/sqlx/cpc_sketch_agg_string_lgk_seed.sqlx index 405d6d6..259b177 100644 --- a/cpc/sqlx/cpc_sketch_agg_string_lgk_seed.sqlx +++ b/cpc/sqlx/cpc_sketch_agg_string_lgk_seed.sqlx @@ -54,86 +54,81 @@ function destroyState(state) { // UDAF interface export function initialState(params) { - var state = { + return { lg_k: params.lg_k == null ? default_lg_k : Number(params.lg_k), - seed: params.seed == null ? default_seed : BigInt(params.seed), - sketch: null, - union: null, - serialized: null + seed: params.seed == null ? default_seed : BigInt(params.seed) }; - state.sketch = new Module.cpc_sketch(state.lg_k, state.seed); - return state; } export function aggregate(state, str) { - if (state.sketch == null) { - state.sketch = new Module.cpc_sketch(state.lg_k, state.seed); + try { + if (state.sketch == null) { + state.sketch = new Module.cpc_sketch(state.lg_k, state.seed); + } + state.sketch.updateString(str); + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); } - state.sketch.updateString(str); } export function serialize(state) { + if (state.sketch == null && state.union == null) return state; // for transition deserialize-serialize try { + // for prior transition deserialize-aggregate + // merge aggregated and serialized state if (state.sketch != null && state.serialized != null) { - // merge aggregated and serialized state - var u = new Module.cpc_union(state.lg_k, state.seed); + var u = null; try { + u = new Module.cpc_union(state.lg_k, state.seed); u.updateWithSketch(state.sketch); u.updateWithBytes(state.serialized, state.seed); state.serialized = u.getResultAsUint8Array(); } finally { - u.delete(); + if (u != null) u.delete(); } } else if (state.sketch != null) { state.serialized = state.sketch.serializeAsUint8Array(); } else if (state.union != null) { state.serialized = state.union.getResultAsUint8Array(); - } else if (state.serialized == null) { - throw new Error("Unexpected state in serialization " + JSON.stringify(state)); } return { lg_k: state.lg_k, seed: state.seed, - bytes: state.serialized + serialized: state.serialized }; + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); } finally { destroyState(state); } } -export function deserialize(serialized) { - return { - sketch: null, - union: null, - serialized: serialized.bytes, - lg_k: serialized.lg_k, - seed: serialized.seed - }; +export function deserialize(state) { + return state; } export function merge(state, other_state) { - if (!state.union) { - state.union = new Module.cpc_union(state.lg_k, state.seed); - } - if (state.sketch || other_state.sketch) { - throw new Error("sketch is not expected in merge"); - } - if (other_state.union) { - throw new Error("other_state should not have union in merge"); - } - if (state.serialized) { - state.union.updateWithBytes(state.serialized, state.seed); - state.serialized = null; - } - if (other_state.serialized) { - state.union.updateWithBytes(other_state.serialized, other_state.seed); - other_state.serialized = null; - } else { - throw new Error("other_state should have serialized sketch in merge"); + try { + if (state.union == null) { + state.union = new Module.cpc_union(state.lg_k, state.seed); + } + if (state.serialized) { + state.union.updateWithBytes(state.serialized, state.seed); + state.serialized = null; + } + if (other_state.serialized) { + state.union.updateWithBytes(other_state.serialized, other_state.seed); + other_state.serialized = null; + } + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); } } export function finalize(state) { - return serialize(state).bytes + return serialize(state).serialized; } """; diff --git a/cpc/sqlx/cpc_sketch_agg_union_lgk_seed.sqlx b/cpc/sqlx/cpc_sketch_agg_union_lgk_seed.sqlx index 14943d2..0b8f80c 100644 --- a/cpc/sqlx/cpc_sketch_agg_union_lgk_seed.sqlx +++ b/cpc/sqlx/cpc_sketch_agg_union_lgk_seed.sqlx @@ -52,19 +52,16 @@ function ensureUnion(state) { state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } export function initialState(params) { - var state = { + return { lg_k: params.lg_k == null ? default_lg_k : Number(params.lg_k), - seed: params.seed == null ? default_seed : BigInt(params.seed), - sketch: null, - union: null, - serialized: null + seed: params.seed == null ? default_seed : BigInt(params.seed) }; - return state; } export function aggregate(state, sketch) { @@ -73,20 +70,22 @@ export function aggregate(state, sketch) { try { state.union.updateWithBytes(sketch, state.seed); } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } } export function serialize(state) { + ensureUnion(state); try { - ensureUnion(state); return { lg_k: state.lg_k, seed: state.seed, - bytes: state.union.getResultAsUint8Array() + serialized: state.union.getResultAsUint8Array() }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { state.union.delete(); @@ -94,32 +93,24 @@ export function serialize(state) { } } -export function deserialize(serialized) { - return { - lg_k: serialized.lg_k, - seed: serialized.seed, - union: null, - serialized: serialized.bytes - }; +export function deserialize(state) { + return state; } export function merge(state, other_state) { - if (other_state.union) { - throw new Error("Did not expect union in other state"); - } ensureUnion(state); if (other_state.serialized != null) { try { state.union.updateWithBytes(other_state.serialized, other_state.seed); other_state.serialized = null; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } - } else { - throw new Error("Expected serialized sketch in other_state"); } } + export function finalize(state) { - return serialize(state).bytes; + return serialize(state).serialized; } """; diff --git a/hll/sqlx/hll_sketch_agg_string_lgk_type.sqlx b/hll/sqlx/hll_sketch_agg_string_lgk_type.sqlx index e188bc6..afba618 100644 --- a/hll/sqlx/hll_sketch_agg_string_lgk_type.sqlx +++ b/hll/sqlx/hll_sketch_agg_string_lgk_type.sqlx @@ -53,16 +53,10 @@ function destroyState(state) { // UDAF interface export function initialState(params) { - var state = { + return { lg_k: params.lg_k == null ? default_lg_k : Number(params.lg_k), tgt_type: params.tgt_type == null ? "" : params.tgt_type }; - try { - state.sketch = new Module.hll_sketch(state.lg_k, state.tgt_type); - return state; - } catch (e) { - throw new Error(Module.getExceptionMessage(e)); - } } export function aggregate(state, str) { @@ -72,39 +66,39 @@ export function aggregate(state, str) { } state.sketch.updateString(str); } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } export function serialize(state) { - if (state.sketch == null) return state; // for transition deserialize-serialize + if (state.sketch == null && state.union == null) return state; // for transition deserialize-serialize try { - try { - // for prior transition deserialize-aggregate - // merge aggregated and serialized state - if (state.sketch != null && state.serialized != null) { - var u = null; - try { - u = new Module.hll_union(state.lg_k); - u.updateWithSketch(state.sketch); - u.updateWithBytes(state.serialized); - state.serialized = u.getResultAsUint8Array(state.tgt_type); - } finally { - if (u != null) u.delete(); - } - } else if (state.sketch != null) { - state.serialized = state.sketch.serializeAsUint8Array(); - } else if (state.union != null) { - state.serialized = state.union.getResultAsUint8Array(state.tgt_type); + // for prior transition deserialize-aggregate + // merge aggregated and serialized state + if (state.sketch != null && state.serialized != null) { + var u = null; + try { + u = new Module.hll_union(state.lg_k); + u.updateWithSketch(state.sketch); + u.updateWithBytes(state.serialized); + state.serialized = u.getResultAsUint8Array(state.tgt_type); + } finally { + if (u != null) u.delete(); } - return { - lg_k: state.lg_k, - tgt_type: state.tgt_type, - serialized: state.serialized - }; - } catch (e) { - throw new Error(Module.getExceptionMessage(e)); + } else if (state.sketch != null) { + state.serialized = state.sketch.serializeAsUint8Array(); + } else if (state.union != null) { + state.serialized = state.union.getResultAsUint8Array(state.tgt_type); } + return { + lg_k: state.lg_k, + tgt_type: state.tgt_type, + serialized: state.serialized + }; + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); } finally { destroyState(state); } @@ -128,11 +122,12 @@ export function merge(state, other_state) { other_state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } export function finalize(state) { - return serialize(state).serialized + return serialize(state).serialized; } """; diff --git a/hll/sqlx/hll_sketch_agg_union_lgk_type.sqlx b/hll/sqlx/hll_sketch_agg_union_lgk_type.sqlx index b418e15..0a86e4f 100644 --- a/hll/sqlx/hll_sketch_agg_union_lgk_type.sqlx +++ b/hll/sqlx/hll_sketch_agg_union_lgk_type.sqlx @@ -51,6 +51,7 @@ function ensureUnion(state) { state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -68,6 +69,7 @@ export function aggregate(state, sketch) { try { state.union.updateWithBytes(sketch); } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -83,6 +85,7 @@ export function serialize(state) { serialized: state.union.getResultAsUint8Array(state.tgt_type) }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { state.union.delete(); @@ -101,13 +104,13 @@ export function merge(state, other_state) { state.union.updateWithBytes(other_state.serialized); other_state.serialized = null; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } } export function finalize(state) { -// throw new Error("type: " + state.tgt_type); return serialize(state).serialized; } """; diff --git a/kll/Makefile b/kll/Makefile index 254feae..627d110 100644 --- a/kll/Makefile +++ b/kll/Makefile @@ -51,6 +51,7 @@ upload: all create: @for file in $(wildcard sqlx/*.sqlx); do \ + echo creating $$file; \ ../substitute_and_run.sh $$file ; \ done diff --git a/kll/sqlx/kll_sketch_float_build.sqlx b/kll/sqlx/kll_sketch_float_build.sqlx index 8bd1f43..b285be0 100644 --- a/kll/sqlx/kll_sketch_float_build.sqlx +++ b/kll/sqlx/kll_sketch_float_build.sqlx @@ -47,6 +47,7 @@ export function initialState(k) { state.sketch = new Module.kll_sketch_float(state.k); return state; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -58,6 +59,7 @@ export function aggregate(state, value) { } state.sketch.update(value); } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -75,6 +77,7 @@ export function serialize(state) { serialized: state.sketch.serializeAsUint8Array() }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { state.sketch.delete(); @@ -99,6 +102,7 @@ export function merge(state, other_state) { other_state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } diff --git a/kll/sqlx/kll_sketch_float_merge.sqlx b/kll/sqlx/kll_sketch_float_merge.sqlx index 13fa8b8..4c0ece6 100644 --- a/kll/sqlx/kll_sketch_float_merge.sqlx +++ b/kll/sqlx/kll_sketch_float_merge.sqlx @@ -47,6 +47,7 @@ export function initialState(k) { state.sketch = new Module.kll_sketch_float(state.k); return state; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -58,6 +59,7 @@ export function aggregate(state, sketch) { } state.sketch.mergeBytes(sketch); } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -75,6 +77,7 @@ export function serialize(state) { serialized: state.sketch.serializeAsUint8Array() }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { state.sketch.delete(); @@ -99,6 +102,7 @@ export function merge(state, other_state) { other_state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } diff --git a/theta/sqlx/theta_sketch_agg_string_lgk_seed_p.sqlx b/theta/sqlx/theta_sketch_agg_string_lgk_seed_p.sqlx index 3c84f65..fe36c75 100644 --- a/theta/sqlx/theta_sketch_agg_string_lgk_seed_p.sqlx +++ b/theta/sqlx/theta_sketch_agg_string_lgk_seed_p.sqlx @@ -73,7 +73,7 @@ export function aggregate(state, str) { } export function serialize(state) { - if (state.sketch == null) return state; // for transition deserialize-serialize + if (state.sketch == null && state.union == null) return state; // for transition deserialize-serialize try { // for prior transition deserialize-aggregate // merge aggregated and serialized state diff --git a/theta/sqlx/theta_sketch_agg_union_lgk_seed.sqlx b/theta/sqlx/theta_sketch_agg_union_lgk_seed.sqlx index dbd02b3..9f18fb2 100644 --- a/theta/sqlx/theta_sketch_agg_union_lgk_seed.sqlx +++ b/theta/sqlx/theta_sketch_agg_union_lgk_seed.sqlx @@ -66,6 +66,7 @@ function ensureUnion(state) { state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -89,22 +90,25 @@ export function aggregate(state, sketch) { Module.HEAPU8.subarray(buffer.ptr, buffer.ptr + sketch.length).set(sketch); state.union.updateWithBuffer(buffer.ptr, sketch.length, state.seed); } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } } export function serialize(state) { + if (state.union == null) return state; // for transition deserialize-serialize + ensureUnion(state); try { - ensureUnion(state); reserveBuffer(Module.compact_theta_sketch.getMaxSerializedSizeBytes(state.lg_k)); var size = state.union.getResultStreamCompressed(buffer.ptr, buffer.size); return { lg_k: state.lg_k, seed: state.seed, - bytes: Module.HEAPU8.slice(buffer.ptr, buffer.ptr + size) + serialized: Module.HEAPU8.slice(buffer.ptr, buffer.ptr + size) }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { if (state.union != null) { @@ -114,47 +118,25 @@ export function serialize(state) { } } -export function deserialize(serialized) { - return { - lg_k: serialized.lg_k, - seed: serialized.seed, - union: null, - serialized: serialized.bytes - }; +export function deserialize(state) { + return state; } export function merge(state, other_state) { reserveBuffer(Module.compact_theta_sketch.getMaxSerializedSizeBytes(state.lg_k)); - if (other_state.union) { - throw new Error("Did not expect union in other state"); - } - if (state.union == null) { - state.union = new Module.theta_union(state.lg_k, state.seed); - } - if (state.serialized != null) { - try { - reserveBuffer(state.serialized.length); - Module.HEAPU8.subarray(buffer.ptr, buffer.ptr + state.serialized.length).set(state.serialized); - state.union.updateWithBuffer(buffer.ptr, state.serialized.length, state.seed); - state.serialized = null; - } catch (e) { - throw new Error(Module.getExceptionMessage(e)); - } - } - if (other_state.serialized != null) { - try { - reserveBuffer(other_state.serialized.length); - Module.HEAPU8.subarray(buffer.ptr, buffer.ptr + other_state.serialized.length).set(other_state.serialized); - state.union.updateWithBuffer(buffer.ptr, other_state.serialized.length, other_state.seed); - other_state.serialized = null; - } catch (e) { - throw new Error(Module.getExceptionMessage(e)); - } - } else { - throw new Error("Expected serialized sketch in other_state"); + ensureUnion(state); + try { + reserveBuffer(other_state.serialized.length); + Module.HEAPU8.subarray(buffer.ptr, buffer.ptr + other_state.serialized.length).set(other_state.serialized); + state.union.updateWithBuffer(buffer.ptr, other_state.serialized.length, other_state.seed); + other_state.serialized = null; + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); } } + export function finalize(state) { - return serialize(state).bytes; + return serialize(state).serialized; } """; diff --git a/tuple/sqlx/tuple_sketch_int64_agg_int64_lgk_seed_p_mode.sqlx b/tuple/sqlx/tuple_sketch_int64_agg_int64_lgk_seed_p_mode.sqlx index a7e7765..89dfb7a 100644 --- a/tuple/sqlx/tuple_sketch_int64_agg_int64_lgk_seed_p_mode.sqlx +++ b/tuple/sqlx/tuple_sketch_int64_agg_int64_lgk_seed_p_mode.sqlx @@ -72,14 +72,19 @@ export function initialState(params) { } export function aggregate(state, key, value) { - if (state.sketch == null) { - state.sketch = new Module.update_tuple_sketch_int64(state.lg_k, state.seed, state.p, state.mode); + try { + if (state.sketch == null) { + state.sketch = new Module.update_tuple_sketch_int64(state.lg_k, state.seed, state.p, state.mode); + } + state.sketch.updateInt64(key, value); + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); } - state.sketch.updateInt64(key, value); } export function serialize(state) { - if (state.sketch == null) return state; // for transition deserialize-serialize + if (state.sketch == null && state.union == null) return state; // for transition deserialize-serialize try { // for prior transition deserialize-aggregate // merge aggregated and serialized state @@ -105,6 +110,7 @@ export function serialize(state) { serialized: state.serialized }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { destroyState(state); @@ -129,6 +135,7 @@ export function merge(state, other_state) { other_state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } diff --git a/tuple/sqlx/tuple_sketch_int64_agg_string_lgk_seed_p_mode.sqlx b/tuple/sqlx/tuple_sketch_int64_agg_string_lgk_seed_p_mode.sqlx index f854325..4f88df3 100644 --- a/tuple/sqlx/tuple_sketch_int64_agg_string_lgk_seed_p_mode.sqlx +++ b/tuple/sqlx/tuple_sketch_int64_agg_string_lgk_seed_p_mode.sqlx @@ -72,14 +72,19 @@ export function initialState(params) { } export function aggregate(state, key, value) { - if (state.sketch == null) { - state.sketch = new Module.update_tuple_sketch_int64(state.lg_k, state.seed, state.p, state.mode); + try { + if (state.sketch == null) { + state.sketch = new Module.update_tuple_sketch_int64(state.lg_k, state.seed, state.p, state.mode); + } + state.sketch.updateString(key, value); + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); } - state.sketch.updateString(key, value); } export function serialize(state) { - if (state.sketch == null) return state; // for transition deserialize-serialize + if (state.sketch == null && state.union == null) return state; // for transition deserialize-serialize try { // for prior transition deserialize-aggregate // merge aggregated and serialized state @@ -105,6 +110,7 @@ export function serialize(state) { serialized: state.serialized }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { destroyState(state); @@ -129,6 +135,7 @@ export function merge(state, other_state) { other_state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } diff --git a/tuple/sqlx/tuple_sketch_int64_agg_union_lgk_seed_mode.sqlx b/tuple/sqlx/tuple_sketch_int64_agg_union_lgk_seed_mode.sqlx index 8115f49..f8f6aa2 100644 --- a/tuple/sqlx/tuple_sketch_int64_agg_union_lgk_seed_mode.sqlx +++ b/tuple/sqlx/tuple_sketch_int64_agg_union_lgk_seed_mode.sqlx @@ -51,6 +51,7 @@ function ensureUnion(state) { state.serialized = null; } } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -69,6 +70,7 @@ export function aggregate(state, sketch) { try { state.union.updateWithBytes(sketch, state.seed); } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } @@ -88,6 +90,7 @@ export function serialize(state) { serialized: state.union.getResultAsUint8Array() }; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } finally { state.union.delete(); @@ -106,6 +109,7 @@ export function merge(state, other_state) { state.union.updateWithBytes(other_state.serialized, other_state.seed); other_state.serialized = null; } catch (e) { + if (e.message != null) throw e; throw new Error(Module.getExceptionMessage(e)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
