This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 246fd61  add hyperloglog implementation (`add` and `count`) (#1095)
246fd61 is described below

commit 246fd615e11e11c1186a5ea65bcc07ce7fd44c02
Author: Jiayu Liu <[email protected]>
AuthorDate: Tue Oct 12 00:48:32 2021 +0800

    add hyperloglog implementation (`add` and `count`) (#1095)
---
 datafusion/src/physical_plan/hyperloglog/mod.rs | 303 ++++++++++++++++++++++++
 datafusion/src/physical_plan/mod.rs             |   3 +-
 2 files changed, 305 insertions(+), 1 deletion(-)

diff --git a/datafusion/src/physical_plan/hyperloglog/mod.rs 
b/datafusion/src/physical_plan/hyperloglog/mod.rs
new file mode 100644
index 0000000..25e5213
--- /dev/null
+++ b/datafusion/src/physical_plan/hyperloglog/mod.rs
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # HyperLogLog
+//!
+//! `hyperloglog` is a module that contains a modified version
+//! of [redis's 
implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c)
+//! with some modification based on strong assumption of usage
+//! within datafusion, so that [`approx_distinct`] function can
+//! be efficiently implemented.
+//!
+//! Specifically, like Redis's version, this HLL structure uses
+//! 2**14 = 16384 registers, which means the standard error is
+//! 1.04/(16384**0.5) = 0.8125%. Unlike Redis, the register takes
+//! up full [`u8`] size instead of a raw int* and thus saves some
+//! tricky bit shifting techniques used in the original version.
+//! This results in a memory usage increase from 12Kib to 16Kib.
+//! Also only the dense version is adopted, so there's no automatic
+//! conversion, largely to simplify the code.
+//!
+//! This module also borrows some code structure from 
[pdatastructs.rs](https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs).
+
+// TODO remove this when hooked up with the rest
+#![allow(dead_code)]
+
+use ahash::{AHasher, RandomState};
+use std::hash::{BuildHasher, Hash, Hasher};
+use std::marker::PhantomData;
+
+/// The greater is P, the smaller the error.
+const HLL_P: usize = 14_usize;
+/// the number of bits of the hash value used determining the number of 
leading zeros
+const HLL_Q: usize = 64_usize - HLL_P;
+const NUM_REGISTERS: usize = 1_usize << HLL_P;
+/// mask to obtain index into the registers
+const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1;
+
+#[derive(Clone, Debug)]
+pub(crate) struct HyperLogLog<T>
+where
+    T: Hash + ?Sized,
+{
+    registers: [u8; NUM_REGISTERS],
+    phantom: PhantomData<T>,
+}
+
+/// fixed seed for the hashing so that values are consistent across runs
+const SEED: RandomState = RandomState::with_seeds(
+    0x885f6cab121d01a3_u64,
+    0x71e4379f2976ad8f_u64,
+    0xbf30173dd28a8816_u64,
+    0x0eaea5d736d733a4_u64,
+);
+
+impl<T> HyperLogLog<T>
+where
+    T: Hash + ?Sized,
+{
+    /// Creates a new, empty HyperLogLog.
+    pub fn new() -> Self {
+        let registers = [0; NUM_REGISTERS];
+        Self {
+            registers,
+            phantom: PhantomData,
+        }
+    }
+
+    /// choice of hash function: ahash is already an dependency
+    /// and it fits the requirements of being a 64bit hash with
+    /// reasonable performance.
+    #[inline]
+    fn hash_value(&self, obj: &T) -> u64 {
+        let mut hasher: AHasher = SEED.build_hasher();
+        obj.hash(&mut hasher);
+        hasher.finish()
+    }
+
+    /// Adds an element to the HyperLogLog.
+    pub fn add(&mut self, obj: &T) {
+        let hash = self.hash_value(obj);
+        let index = (hash & HLL_P_MASK) as usize;
+        let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
+        self.registers[index] = self.registers[index].max(p as u8);
+    }
+
+    /// Get the register histogram (each value in register index into
+    /// the histogram; u32 is enough because we only have 2**14=16384 registers
+    #[inline]
+    fn get_histogram(&self) -> [u32; HLL_Q + 2] {
+        let mut histogram = [0; HLL_Q + 2];
+        // hopefully this can be unrolled
+        for r in self.registers {
+            histogram[r as usize] += 1;
+        }
+        histogram
+    }
+
+    /// Guess the number of unique elements seen by the HyperLogLog.
+    pub fn count(&self) -> usize {
+        let histogram = self.get_histogram();
+        let m = NUM_REGISTERS as f64;
+        let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m);
+        for i in histogram[1..=HLL_Q].iter().rev() {
+            z += *i as f64;
+            z *= 0.5;
+        }
+        z += m * hll_sigma(histogram[0] as f64 / m);
+        (0.5 / 2_f64.ln() * m * m / z).round() as usize
+    }
+}
+
+/// Helper function sigma as defined in
+/// "New cardinality estimation algorithms for HyperLogLog sketches"
+/// Otmar Ertl, arXiv:1702.01284
+#[inline]
+fn hll_sigma(x: f64) -> f64 {
+    if x == 1. {
+        f64::INFINITY
+    } else {
+        let mut y = 1.0;
+        let mut z = x;
+        let mut x = x;
+        loop {
+            x *= x;
+            let z_prime = z;
+            z += x * y;
+            y += y;
+            if z_prime == z {
+                break;
+            }
+        }
+        z
+    }
+}
+
+/// Helper function tau as defined in
+/// "New cardinality estimation algorithms for HyperLogLog sketches"
+/// Otmar Ertl, arXiv:1702.01284
+#[inline]
+fn hll_tau(x: f64) -> f64 {
+    if x == 0.0 || x == 1.0 {
+        0.0
+    } else {
+        let mut y = 1.0;
+        let mut z = 1.0 - x;
+        let mut x = x;
+        loop {
+            x = x.sqrt();
+            let z_prime = z;
+            y *= 0.5;
+            z -= (1.0 - x).powi(2) * y;
+            if z_prime == z {
+                break;
+            }
+        }
+        z / 3.0
+    }
+}
+
+impl<T> Extend<T> for HyperLogLog<T>
+where
+    T: Hash,
+{
+    fn extend<S: IntoIterator<Item = T>>(&mut self, iter: S) {
+        for elem in iter {
+            self.add(&elem);
+        }
+    }
+}
+
+impl<'a, T> Extend<&'a T> for HyperLogLog<T>
+where
+    T: 'a + Hash + ?Sized,
+{
+    fn extend<S: IntoIterator<Item = &'a T>>(&mut self, iter: S) {
+        for elem in iter {
+            self.add(elem);
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{HyperLogLog, NUM_REGISTERS};
+
+    fn compare_with_delta(got: usize, expected: usize) {
+        let expected = expected as f64;
+        let diff = (got as f64) - expected;
+        let diff = diff.abs() / expected;
+        // times 6 because we want the tests to be stable
+        // so we allow a rather large margin of error
+        // this is adopted from redis's unit test version as well
+        let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0;
+        assert!(
+            diff <= margin,
+            "{} is not near {} percent of {} which is ({}, {})",
+            got,
+            margin,
+            expected,
+            expected * (1.0 - margin),
+            expected * (1.0 + margin)
+        );
+    }
+
+    macro_rules! sized_number_test {
+        ($SIZE: expr, $T: tt) => {{
+            let mut hll = HyperLogLog::<$T>::new();
+            for i in 0..$SIZE {
+                hll.add(&i);
+            }
+            compare_with_delta(hll.count(), $SIZE);
+        }};
+    }
+
+    macro_rules! typed_large_number_test {
+        ($SIZE: expr) => {{
+            sized_number_test!($SIZE, u64);
+            sized_number_test!($SIZE, u128);
+            sized_number_test!($SIZE, i64);
+            sized_number_test!($SIZE, i128);
+        }};
+    }
+
+    macro_rules! typed_number_test {
+        ($SIZE: expr) => {{
+            sized_number_test!($SIZE, u16);
+            sized_number_test!($SIZE, u32);
+            sized_number_test!($SIZE, i16);
+            sized_number_test!($SIZE, i32);
+            typed_large_number_test!($SIZE);
+        }};
+    }
+
+    #[test]
+    fn test_empty() {
+        let hll = HyperLogLog::<u64>::new();
+        assert_eq!(hll.count(), 0);
+    }
+
+    #[test]
+    fn test_one() {
+        let mut hll = HyperLogLog::<u64>::new();
+        hll.add(&1);
+        assert_eq!(hll.count(), 1);
+    }
+
+    #[test]
+    fn test_number_100() {
+        typed_number_test!(100);
+    }
+
+    #[test]
+    fn test_number_1k() {
+        typed_number_test!(1_000);
+    }
+
+    #[test]
+    fn test_number_10k() {
+        typed_number_test!(10_000);
+    }
+
+    #[test]
+    fn test_number_100k() {
+        typed_large_number_test!(100_000);
+    }
+
+    #[test]
+    fn test_number_1m() {
+        typed_large_number_test!(1_000_000);
+    }
+
+    #[test]
+    fn test_u8() {
+        let mut hll = HyperLogLog::<[u8]>::new();
+        for i in 0..1000 {
+            let s = i.to_string();
+            let b = s.as_bytes();
+            hll.add(b);
+        }
+        compare_with_delta(hll.count(), 1000);
+    }
+
+    #[test]
+    fn test_string() {
+        let mut hll = HyperLogLog::<String>::new();
+        hll.extend((0..1000).map(|i| i.to_string()));
+        compare_with_delta(hll.count(), 1000);
+    }
+}
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index f0b5622..fef2af5 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -102,7 +102,7 @@ pub struct Statistics {
     /// Statistics on a column level
     pub column_statistics: Option<Vec<ColumnStatistics>>,
     /// If true, any field that is `Some(..)` is the actual value in the data 
provided by the operator (it is not
-    /// an estimate). Any or all other fields might still be None, in which 
case no information is known.  
+    /// an estimate). Any or all other fields might still be None, in which 
case no information is known.
     /// if false, any field that is `Some(..)` may contain an inexact estimate 
and may not be the actual value.
     pub is_exact: bool,
 }
@@ -625,6 +625,7 @@ pub mod functions;
 pub mod hash_aggregate;
 pub mod hash_join;
 pub mod hash_utils;
+pub(crate) mod hyperloglog;
 pub mod join_utils;
 pub mod json;
 pub mod limit;

Reply via email to