This is an automated email from the ASF dual-hosted git repository. jiayuliu pushed a commit to branch bloom-filter-reader in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit 3cc48d34d9bcc9e0922cd678ef34be6a26ff55b6 Author: Jiayu Liu <[email protected]> AuthorDate: Wed Nov 9 17:47:57 2022 +0800 bloom filter reader --- parquet/src/bloom_filter/mod.rs | 60 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index e455e6252..9166eadc3 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -18,7 +18,20 @@ //! Bloom filter implementation specific to Parquet, as described //! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) +use crate::errors::ParquetError; +use crate::file::metadata::ColumnChunkMetaData; +use crate::file::reader::ChunkReader; +use crate::format::{ + BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, + BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, + RowGroup, +}; use std::hash::Hasher; +use std::io::Cursor; +use std::io::IoSliceMut; +use std::io::{Read, Seek, SeekFrom}; +use std::iter; +use thrift::protocol::TCompactInputProtocol; use twox_hash::XxHash64; const SALT: [u32; 8] = [ @@ -72,6 +85,53 @@ fn block_check(block: &Block, hash: u32) -> bool { pub(crate) struct Sbbf(Vec<Block>); impl Sbbf { + pub fn read_from_column_chunk<R: Read + Seek>( + column_metadata: &ColumnChunkMetaData, + mut reader: &mut R, + ) -> Result<Self, ParquetError> { + let offset = column_metadata.bloom_filter_offset().ok_or_else(|| { + ParquetError::General("Bloom filter offset is not set".to_string()) + })? as u64; + reader.seek(SeekFrom::Start(offset))?; + // deserialize header + let mut prot = TCompactInputProtocol::new(&mut reader); + let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?; + + match header.algorithm { + BloomFilterAlgorithm::BLOCK(_) => { + // this match exists to future proof the singleton algorithm enum + } + } + match header.compression { + BloomFilterCompression::UNCOMPRESSED(_) => { + // this match exists to future proof the singleton compression enum + } + } + match header.hash { + BloomFilterHash::XXHASH(_) => { + // this match exists to future proof the singleton hash enum + } + } + let length: usize = header.num_bytes.try_into().map_err(|_| { + ParquetError::General("Bloom filter length is invalid".to_string()) + })?; + let mut buffer = vec![0_u8; length]; + reader.read_exact(&mut buffer).map_err(|e| { + ParquetError::General(format!("Could not read bloom filter: {}", e)) + })?; + let data = buffer + .chunks_exact(4 * 8) + .map(|chunk| { + let mut block = [0_u32; 8]; + for (i, word) in chunk.chunks_exact(4).enumerate() { + block[i] = u32::from_le_bytes(word.try_into().unwrap()); + } + block + }) + .collect::<Vec<Block>>(); + Ok(Self(data)) + } + #[inline] fn hash_to_block_index(&self, hash: u64) -> usize { // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
