This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e49f31e6f feat: add method for async read bloom filter (#4917)
6e49f31e6f is described below

commit 6e49f31e6fc992cfd93e84bd6f2d3d6b563b62a4
Author: Hengfei Yang <[email protected]>
AuthorDate: Thu Oct 12 09:30:36 2023 -0500

    feat: add method for async read bloom filter (#4917)
    
    * feat: add method for async read bloomfilter
    
    * fix: compatible for bloom filter length
    
    * test: add unit tests for read bloom filter
    
    * fix: format code for unit test
---
 parquet/src/arrow/async_reader/mod.rs | 147 ++++++++++++++++++++++++++++++++--
 parquet/src/bloom_filter/mod.rs       |   8 +-
 2 files changed, 146 insertions(+), 9 deletions(-)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index c749d4deeb..4b3eebf2e6 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -77,7 +77,6 @@
 
 use std::collections::VecDeque;
 use std::fmt::Formatter;
-
 use std::io::SeekFrom;
 use std::ops::Range;
 use std::pin::Pin;
@@ -88,7 +87,6 @@ use bytes::{Buf, Bytes};
 use futures::future::{BoxFuture, FutureExt};
 use futures::ready;
 use futures::stream::Stream;
-
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
 
 use arrow_array::RecordBatch;
@@ -102,15 +100,18 @@ use crate::arrow::arrow_reader::{
 };
 use crate::arrow::ProjectionMask;
 
+use crate::bloom_filter::{
+    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
+};
 use crate::column::page::{PageIterator, PageReader};
-
 use crate::errors::{ParquetError, Result};
 use crate::file::footer::{decode_footer, decode_metadata};
 use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
 use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
-use crate::format::PageLocation;
-
 use crate::file::FOOTER_SIZE;
+use crate::format::{
+    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, 
PageLocation,
+};
 
 mod metadata;
 pub use metadata::*;
@@ -302,6 +303,71 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
         Self::new_builder(AsyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as 
projection and predicate pushdown.
+    pub async fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+
+        let offset: usize = if let Some(offset) = 
column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is 
invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = match column_metadata.bloom_filter_length() {
+            Some(length) => self.input.0.get_bytes(offset..offset + length as 
usize),
+            None => self
+                .input
+                .0
+                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
+        }
+        .await?;
+
+        let (header, bitset_offset) =
+            chunk_read_bloom_filter_header_and_offset(offset as u64, 
buffer.clone())?;
+
+        match header.algorithm {
+            BloomFilterAlgorithm::BLOCK(_) => {
+                // this match exists to future proof the singleton algorithm 
enum
+            }
+        }
+        match header.compression {
+            BloomFilterCompression::UNCOMPRESSED(_) => {
+                // this match exists to future proof the singleton compression 
enum
+            }
+        }
+        match header.hash {
+            BloomFilterHash::XXHASH(_) => {
+                // this match exists to future proof the singleton hash enum
+            }
+        }
+
+        let bitset = match column_metadata.bloom_filter_length() {
+            Some(_) => buffer.slice((bitset_offset as usize - offset)..),
+            None => {
+                let bitset_length: usize = 
header.num_bytes.try_into().map_err(|_| {
+                    ParquetError::General("Bloom filter length is 
invalid".to_string())
+                })?;
+                self.input
+                    .0
+                    .get_bytes(
+                        bitset_offset as usize..bitset_offset as usize + 
bitset_length,
+                    )
+                    .await?
+            }
+        };
+        Ok(Some(Sbbf::new(&bitset)))
+    }
+
     /// Build a new [`ParquetRecordBatchStream`]
     pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
         let num_row_groups = self.metadata.row_groups().len();
@@ -1540,4 +1606,75 @@ mod tests {
         assert_ne!(1024, file_rows);
         assert_eq!(stream.batch_size, file_rows);
     }
+
+    #[tokio::test]
+    async fn test_get_row_group_column_bloom_filter_without_length() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+        let data = Bytes::from(std::fs::read(path).unwrap());
+        test_get_row_group_column_bloom_filter(data, false).await;
+    }
+
+    #[tokio::test]
+    async fn test_get_row_group_column_bloom_filter_with_length() {
+        // convert to new parquet file with bloom_filter_length
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+        let data = Bytes::from(std::fs::read(path).unwrap());
+        let metadata = parse_metadata(&data).unwrap();
+        let metadata = Arc::new(metadata);
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+            .await
+            .unwrap();
+        let schema = builder.schema().clone();
+        let stream = builder.build().unwrap();
+        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
+
+        let mut parquet_data = Vec::new();
+        let props = WriterProperties::builder()
+            .set_bloom_filter_enabled(true)
+            .build();
+        let mut writer =
+            ArrowWriter::try_new(&mut parquet_data, schema, 
Some(props)).unwrap();
+        for batch in batches {
+            writer.write(&batch).unwrap();
+        }
+        writer.close().unwrap();
+
+        // test the new parquet file
+        test_get_row_group_column_bloom_filter(parquet_data.into(), 
true).await;
+    }
+
+    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: 
bool) {
+        let metadata = parse_metadata(&data).unwrap();
+        let metadata = Arc::new(metadata);
+
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        let column = row_group.column(0);
+        assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+
+        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+            .await
+            .unwrap();
+
+        let sbbf = builder
+            .get_row_group_column_bloom_filter(0, 0)
+            .await
+            .unwrap()
+            .unwrap();
+        assert!(sbbf.check(&"Hello"));
+        assert!(!sbbf.check(&"Hello_Not_Exists"));
+    }
 }
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index a3807eb370..e98aee9fd2 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -132,11 +132,11 @@ impl std::ops::IndexMut<usize> for Block {
 #[derive(Debug, Clone)]
 pub struct Sbbf(Vec<Block>);
 
-const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
+pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
 
 /// given an initial offset, and a byte buffer, try to read out a bloom filter 
header and return
 /// both the header and the offset after it (for bitset).
-fn chunk_read_bloom_filter_header_and_offset(
+pub(crate) fn chunk_read_bloom_filter_header_and_offset(
     offset: u64,
     buffer: Bytes,
 ) -> Result<(BloomFilterHeader, u64), ParquetError> {
@@ -147,7 +147,7 @@ fn chunk_read_bloom_filter_header_and_offset(
 /// given a [Bytes] buffer, try to read out a bloom filter header and return 
both the header and
 /// length of the header.
 #[inline]
-fn read_bloom_filter_header_and_length(
+pub(crate) fn read_bloom_filter_header_and_length(
     buffer: Bytes,
 ) -> Result<(BloomFilterHeader, u64), ParquetError> {
     let total_length = buffer.len();
@@ -199,7 +199,7 @@ impl Sbbf {
         Self::new(&bitset)
     }
 
-    fn new(bitset: &[u8]) -> Self {
+    pub(crate) fn new(bitset: &[u8]) -> Self {
         let data = bitset
             .chunks_exact(4 * 8)
             .map(|chunk| {

Reply via email to