haohuaijin commented on code in PR #22768:
URL: https://github.com/apache/datafusion/pull/22768#discussion_r3362397570


##########
datafusion/functions-aggregate/src/approx_distinct.rs:
##########
@@ -294,6 +297,371 @@ where
     default_accumulator_impl!();
 }
 
+/// Maximum number of distinct hashes kept in the sparse representation of a
+/// per-group sketch before it is promoted to a dense [`HyperLogLog`].
+///
+/// A dense sketch always occupies [`NUM_REGISTERS`] (16 KiB) regardless of how
+/// many values it has seen. The vast majority of groups in a high-cardinality
+/// `GROUP BY` only observe a handful of distinct values, so keeping their 
state
+/// as a small list of hashes saves a huge amount of memory (both while
+/// aggregating and when serializing the partial state for the final phase).
+const SPARSE_LIMIT: usize = 256;
+
+/// Per-group HyperLogLog state used by [`HllGroupsAccumulator`].
+///
+/// Starts out as a compact list of the (deduplicated) hashes observed for the
+/// group and only switches to a full dense [`HyperLogLog`] once it has seen 
more
+/// than [`SPARSE_LIMIT`] distinct values. Folding the stored hashes into a 
dense
+/// sketch produces exactly the same registers as adding the original values 
one
+/// by one, so the cardinality estimate is identical to the per-group
+/// [`Accumulator`] path.
+#[derive(Clone, Debug)]
+enum GroupHll {
+    /// Distinct hashes seen so far. May contain duplicates between 
compactions.
+    Sparse(Vec<u64>),
+    Dense(Box<HyperLogLog<u8>>),
+}
+
+impl Default for GroupHll {
+    fn default() -> Self {
+        GroupHll::Sparse(Vec::new())
+    }
+}
+
+impl GroupHll {
+    /// Add a pre-computed hash, returning the change in heap-allocated bytes 
so
+    /// the accumulator can track its memory usage incrementally.
+    #[inline]
+    fn add_hash(&mut self, hash: u64) -> isize {
+        match self {
+            GroupHll::Dense(hll) => {
+                hll.add_hashed(hash);
+                0
+            }
+            GroupHll::Sparse(v) => {
+                let cap_before = v.capacity();
+                v.push(hash);
+                if v.len() >= 2 * SPARSE_LIMIT {
+                    return self.compact_or_promote(cap_before);
+                }
+                ((v.capacity() - cap_before) * size_of::<u64>()) as isize
+            }
+        }
+    }
+
+    /// Deduplicate the sparse hash list and, if it still exceeds
+    /// [`SPARSE_LIMIT`] distinct values, promote it to a dense sketch.
+    #[cold]
+    fn compact_or_promote(&mut self, cap_before: usize) -> isize {
+        let GroupHll::Sparse(v) = self else {
+            return 0;
+        };
+        v.sort_unstable();
+        v.dedup();
+        if v.len() > SPARSE_LIMIT {
+            let mut hll = HyperLogLog::<u8>::new();
+            for &h in v.iter() {
+                hll.add_hashed(h);
+            }
+            *self = GroupHll::Dense(Box::new(hll));
+            (NUM_REGISTERS as isize) - ((cap_before * size_of::<u64>()) as 
isize)
+        } else {
+            // capacity is unchanged by sort/dedup
+            0
+        }
+    }
+
+    /// Merge a serialized state (produced by [`Self::serialize`] or by the
+    /// per-group [`Accumulator`]) into this sketch.
+    fn merge_serialized(&mut self, bytes: &[u8]) -> Result<isize> {
+        if bytes.is_empty() {
+            return Ok(0);
+        }
+        if bytes.len() == NUM_REGISTERS {
+            let other: HyperLogLog<u8> = bytes.try_into()?;
+            Ok(self.merge_dense(&other))
+        } else {
+            debug_assert_eq!(bytes.len() % size_of::<u64>(), 0);

Review Comment:
   added in 
[db8baf2](https://github.com/apache/datafusion/pull/22768/commits/db8baf2ee4f51086b71a601d65322d09608565fa)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to