This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 246fd61 add hyperloglog implementation (`add` and `count`) (#1095)
246fd61 is described below
commit 246fd615e11e11c1186a5ea65bcc07ce7fd44c02
Author: Jiayu Liu <[email protected]>
AuthorDate: Tue Oct 12 00:48:32 2021 +0800
add hyperloglog implementation (`add` and `count`) (#1095)
---
datafusion/src/physical_plan/hyperloglog/mod.rs | 303 ++++++++++++++++++++++++
datafusion/src/physical_plan/mod.rs | 3 +-
2 files changed, 305 insertions(+), 1 deletion(-)
diff --git a/datafusion/src/physical_plan/hyperloglog/mod.rs
b/datafusion/src/physical_plan/hyperloglog/mod.rs
new file mode 100644
index 0000000..25e5213
--- /dev/null
+++ b/datafusion/src/physical_plan/hyperloglog/mod.rs
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # HyperLogLog
+//!
+//! `hyperloglog` is a module that contains a modified version
+//! of [redis's
implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c)
+//! with some modification based on strong assumption of usage
+//! within datafusion, so that [`approx_distinct`] function can
+//! be efficiently implemented.
+//!
+//! Specifically, like Redis's version, this HLL structure uses
+//! 2**14 = 16384 registers, which means the standard error is
+//! 1.04/(16384**0.5) = 0.8125%. Unlike Redis, the register takes
+//! up full [`u8`] size instead of a raw int* and thus saves some
+//! tricky bit shifting techniques used in the original version.
+//! This results in a memory usage increase from 12Kib to 16Kib.
+//! Also only the dense version is adopted, so there's no automatic
+//! conversion, largely to simplify the code.
+//!
+//! This module also borrows some code structure from
[pdatastructs.rs](https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs).
+
+// TODO remove this when hooked up with the rest
+#![allow(dead_code)]
+
+use ahash::{AHasher, RandomState};
+use std::hash::{BuildHasher, Hash, Hasher};
+use std::marker::PhantomData;
+
+/// The greater is P, the smaller the error.
+const HLL_P: usize = 14_usize;
+/// the number of bits of the hash value used determining the number of
leading zeros
+const HLL_Q: usize = 64_usize - HLL_P;
+const NUM_REGISTERS: usize = 1_usize << HLL_P;
+/// mask to obtain index into the registers
+const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1;
+
+#[derive(Clone, Debug)]
+pub(crate) struct HyperLogLog<T>
+where
+ T: Hash + ?Sized,
+{
+ registers: [u8; NUM_REGISTERS],
+ phantom: PhantomData<T>,
+}
+
+/// fixed seed for the hashing so that values are consistent across runs
+const SEED: RandomState = RandomState::with_seeds(
+ 0x885f6cab121d01a3_u64,
+ 0x71e4379f2976ad8f_u64,
+ 0xbf30173dd28a8816_u64,
+ 0x0eaea5d736d733a4_u64,
+);
+
+impl<T> HyperLogLog<T>
+where
+ T: Hash + ?Sized,
+{
+ /// Creates a new, empty HyperLogLog.
+ pub fn new() -> Self {
+ let registers = [0; NUM_REGISTERS];
+ Self {
+ registers,
+ phantom: PhantomData,
+ }
+ }
+
+ /// choice of hash function: ahash is already an dependency
+ /// and it fits the requirements of being a 64bit hash with
+ /// reasonable performance.
+ #[inline]
+ fn hash_value(&self, obj: &T) -> u64 {
+ let mut hasher: AHasher = SEED.build_hasher();
+ obj.hash(&mut hasher);
+ hasher.finish()
+ }
+
+ /// Adds an element to the HyperLogLog.
+ pub fn add(&mut self, obj: &T) {
+ let hash = self.hash_value(obj);
+ let index = (hash & HLL_P_MASK) as usize;
+ let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
+ self.registers[index] = self.registers[index].max(p as u8);
+ }
+
+ /// Get the register histogram (each value in register index into
+ /// the histogram; u32 is enough because we only have 2**14=16384 registers
+ #[inline]
+ fn get_histogram(&self) -> [u32; HLL_Q + 2] {
+ let mut histogram = [0; HLL_Q + 2];
+ // hopefully this can be unrolled
+ for r in self.registers {
+ histogram[r as usize] += 1;
+ }
+ histogram
+ }
+
+ /// Guess the number of unique elements seen by the HyperLogLog.
+ pub fn count(&self) -> usize {
+ let histogram = self.get_histogram();
+ let m = NUM_REGISTERS as f64;
+ let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m);
+ for i in histogram[1..=HLL_Q].iter().rev() {
+ z += *i as f64;
+ z *= 0.5;
+ }
+ z += m * hll_sigma(histogram[0] as f64 / m);
+ (0.5 / 2_f64.ln() * m * m / z).round() as usize
+ }
+}
+
+/// Helper function sigma as defined in
+/// "New cardinality estimation algorithms for HyperLogLog sketches"
+/// Otmar Ertl, arXiv:1702.01284
+#[inline]
+fn hll_sigma(x: f64) -> f64 {
+ if x == 1. {
+ f64::INFINITY
+ } else {
+ let mut y = 1.0;
+ let mut z = x;
+ let mut x = x;
+ loop {
+ x *= x;
+ let z_prime = z;
+ z += x * y;
+ y += y;
+ if z_prime == z {
+ break;
+ }
+ }
+ z
+ }
+}
+
+/// Helper function tau as defined in
+/// "New cardinality estimation algorithms for HyperLogLog sketches"
+/// Otmar Ertl, arXiv:1702.01284
+#[inline]
+fn hll_tau(x: f64) -> f64 {
+ if x == 0.0 || x == 1.0 {
+ 0.0
+ } else {
+ let mut y = 1.0;
+ let mut z = 1.0 - x;
+ let mut x = x;
+ loop {
+ x = x.sqrt();
+ let z_prime = z;
+ y *= 0.5;
+ z -= (1.0 - x).powi(2) * y;
+ if z_prime == z {
+ break;
+ }
+ }
+ z / 3.0
+ }
+}
+
+impl<T> Extend<T> for HyperLogLog<T>
+where
+ T: Hash,
+{
+ fn extend<S: IntoIterator<Item = T>>(&mut self, iter: S) {
+ for elem in iter {
+ self.add(&elem);
+ }
+ }
+}
+
+impl<'a, T> Extend<&'a T> for HyperLogLog<T>
+where
+ T: 'a + Hash + ?Sized,
+{
+ fn extend<S: IntoIterator<Item = &'a T>>(&mut self, iter: S) {
+ for elem in iter {
+ self.add(elem);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{HyperLogLog, NUM_REGISTERS};
+
+ fn compare_with_delta(got: usize, expected: usize) {
+ let expected = expected as f64;
+ let diff = (got as f64) - expected;
+ let diff = diff.abs() / expected;
+ // times 6 because we want the tests to be stable
+ // so we allow a rather large margin of error
+ // this is adopted from redis's unit test version as well
+ let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0;
+ assert!(
+ diff <= margin,
+ "{} is not near {} percent of {} which is ({}, {})",
+ got,
+ margin,
+ expected,
+ expected * (1.0 - margin),
+ expected * (1.0 + margin)
+ );
+ }
+
+ macro_rules! sized_number_test {
+ ($SIZE: expr, $T: tt) => {{
+ let mut hll = HyperLogLog::<$T>::new();
+ for i in 0..$SIZE {
+ hll.add(&i);
+ }
+ compare_with_delta(hll.count(), $SIZE);
+ }};
+ }
+
+ macro_rules! typed_large_number_test {
+ ($SIZE: expr) => {{
+ sized_number_test!($SIZE, u64);
+ sized_number_test!($SIZE, u128);
+ sized_number_test!($SIZE, i64);
+ sized_number_test!($SIZE, i128);
+ }};
+ }
+
+ macro_rules! typed_number_test {
+ ($SIZE: expr) => {{
+ sized_number_test!($SIZE, u16);
+ sized_number_test!($SIZE, u32);
+ sized_number_test!($SIZE, i16);
+ sized_number_test!($SIZE, i32);
+ typed_large_number_test!($SIZE);
+ }};
+ }
+
+ #[test]
+ fn test_empty() {
+ let hll = HyperLogLog::<u64>::new();
+ assert_eq!(hll.count(), 0);
+ }
+
+ #[test]
+ fn test_one() {
+ let mut hll = HyperLogLog::<u64>::new();
+ hll.add(&1);
+ assert_eq!(hll.count(), 1);
+ }
+
+ #[test]
+ fn test_number_100() {
+ typed_number_test!(100);
+ }
+
+ #[test]
+ fn test_number_1k() {
+ typed_number_test!(1_000);
+ }
+
+ #[test]
+ fn test_number_10k() {
+ typed_number_test!(10_000);
+ }
+
+ #[test]
+ fn test_number_100k() {
+ typed_large_number_test!(100_000);
+ }
+
+ #[test]
+ fn test_number_1m() {
+ typed_large_number_test!(1_000_000);
+ }
+
+ #[test]
+ fn test_u8() {
+ let mut hll = HyperLogLog::<[u8]>::new();
+ for i in 0..1000 {
+ let s = i.to_string();
+ let b = s.as_bytes();
+ hll.add(b);
+ }
+ compare_with_delta(hll.count(), 1000);
+ }
+
+ #[test]
+ fn test_string() {
+ let mut hll = HyperLogLog::<String>::new();
+ hll.extend((0..1000).map(|i| i.to_string()));
+ compare_with_delta(hll.count(), 1000);
+ }
+}
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index f0b5622..fef2af5 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -102,7 +102,7 @@ pub struct Statistics {
/// Statistics on a column level
pub column_statistics: Option<Vec<ColumnStatistics>>,
/// If true, any field that is `Some(..)` is the actual value in the data
provided by the operator (it is not
- /// an estimate). Any or all other fields might still be None, in which
case no information is known.
+ /// an estimate). Any or all other fields might still be None, in which
case no information is known.
/// if false, any field that is `Some(..)` may contain an inexact estimate
and may not be the actual value.
pub is_exact: bool,
}
@@ -625,6 +625,7 @@ pub mod functions;
pub mod hash_aggregate;
pub mod hash_join;
pub mod hash_utils;
+pub(crate) mod hyperloglog;
pub mod join_utils;
pub mod json;
pub mod limit;