chufucun commented on pull request #185: URL: https://github.com/apache/datasketches-cpp/pull/185#issuecomment-769525196
The reason for submitting this pull is that the intermediate result (dst) can only initialize one type (eg: update_theta_sketch), and theta_union.get_result() returns compact_theta_sketch. This type is inconsistent with the initialization type (update_theta_sketch), and the merge phase needs to reallocate space each time. Observing that [datasketches-hive](https://github.com/apache/datasketches-hive) is implemented with union, which led to the idea of adding a new update function to theta_union and using theta_union to implement the `ds_theta_sketch()`. https://github.com/apache/datasketches-hive/blob/master/src/main/java/org/apache/datasketches/hive/theta/DataToSketchUDAF.java#L126 https://github.com/apache/datasketches-hive/blob/master/src/main/java/org/apache/datasketches/hive/theta/UnionState.java#L66 The new update function added in theta_union is used in the **update phase**。 Complete Code: Note: use `update_theta_sketch` is similar to this but the merge phase code is different. ```c++ // Init phase:Initialize intermediate results (dst) for merge and update phase void AggregateFunctions::DsThetaInit(FunctionContext* ctx, StringVal* dst) { // Allocate space AllocBuffer(ctx, dst, sizeof(datasketches::theta_union)); if (UNLIKELY(dst->is_null)) { DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); return; } // Constructs object initialization memor datasketches::theta_union* sketch_ptr = reinterpret_cast<datasketches::theta_union*>(dst->ptr); datasketches::theta_union sketch = datasketches::theta_union::builder().build(); std::uninitialized_fill_n(sketch_ptr, 1, sketch); } // Update phase template <typename T> void AggregateFunctions::DsThetaUpdate(FunctionContext* ctx, const T& src, StringVal* dst) { if (src.is_null) return; DCHECK(!dst->is_null); DCHECK_EQ(dst->len, sizeof(datasketches::theta_union)); datasketches::theta_union* sketch_ptr = reinterpret_cast<datasketches::theta_union*>(dst->ptr); // need add new update functionality in theta_union sketch_ptr->update(src.val); } // Serialize phase StringVal AggregateFunctions::DsThetaSerialize(FunctionContext* ctx, const StringVal& src) { DCHECK(!src.is_null); DCHECK_EQ(src.len, sizeof(datasketches::theta_union)); datasketches::theta_union* union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr); StringVal dst = SerializeDsThetaUnion(ctx, *union_ptr); ctx->Free(src.ptr); return dst; } // Merge phase void AggregateFunctions::DsThetaMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst) { DCHECK(!src.is_null); DCHECK(!dst->is_null); DCHECK_EQ(dst->len, sizeof(datasketches::theta_union)); // Note, 'src' is a serialized compact_theta_sketch and not a serialized theta_union. auto src_sketch = datasketches::compact_theta_sketch::deserialize((void*)src.ptr, src.len); datasketches::theta_union* dst_union_ptr = reinterpret_cast<datasketches::theta_union*>(dst->ptr); dst_union_ptr->update(src_sketch); } // Finalize phase, return estimate value. BigIntVal AggregateFunctions::DsThetaFinalize(FunctionContext* ctx, const StringVal& src) { DCHECK(!src.is_null); DCHECK_EQ(src.len, sizeof(datasketches::theta_union)); datasketches::theta_union* sketch_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr); auto sketch = sketch_ptr->get_result(); ctx->Free(src.ptr); return sketch.get_estimate(); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
