This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 6e49f31e6f feat: add method for async read bloom filter (#4917)
6e49f31e6f is described below
commit 6e49f31e6fc992cfd93e84bd6f2d3d6b563b62a4
Author: Hengfei Yang <[email protected]>
AuthorDate: Thu Oct 12 09:30:36 2023 -0500
feat: add method for async read bloom filter (#4917)
* feat: add method for async read bloomfilter
* fix: compatible for bloom filter length
* test: add unit tests for read bloom filter
* fix: format code for unit test
---
parquet/src/arrow/async_reader/mod.rs | 147 ++++++++++++++++++++++++++++++++--
parquet/src/bloom_filter/mod.rs | 8 +-
2 files changed, 146 insertions(+), 9 deletions(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index c749d4deeb..4b3eebf2e6 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -77,7 +77,6 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
-
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
@@ -88,7 +87,6 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
-
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow_array::RecordBatch;
@@ -102,15 +100,18 @@ use crate::arrow::arrow_reader::{
};
use crate::arrow::ProjectionMask;
+use crate::bloom_filter::{
+ chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
+};
use crate::column::page::{PageIterator, PageReader};
-
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
-use crate::format::PageLocation;
-
use crate::file::FOOTER_SIZE;
+use crate::format::{
+ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash,
PageLocation,
+};
mod metadata;
pub use metadata::*;
@@ -302,6 +303,71 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
Self::new_builder(AsyncReader(input), metadata)
}
+ /// Read bloom filter for a column in a row group
+ /// Returns `None` if the column does not have a bloom filter
+ ///
+ /// We should call this function after other forms pruning, such as
projection and predicate pushdown.
+ pub async fn get_row_group_column_bloom_filter(
+ &mut self,
+ row_group_idx: usize,
+ column_idx: usize,
+ ) -> Result<Option<Sbbf>> {
+ let metadata = self.metadata.row_group(row_group_idx);
+ let column_metadata = metadata.column(column_idx);
+
+ let offset: usize = if let Some(offset) =
column_metadata.bloom_filter_offset() {
+ offset.try_into().map_err(|_| {
+ ParquetError::General("Bloom filter offset is
invalid".to_string())
+ })?
+ } else {
+ return Ok(None);
+ };
+
+ let buffer = match column_metadata.bloom_filter_length() {
+ Some(length) => self.input.0.get_bytes(offset..offset + length as
usize),
+ None => self
+ .input
+ .0
+ .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
+ }
+ .await?;
+
+ let (header, bitset_offset) =
+ chunk_read_bloom_filter_header_and_offset(offset as u64,
buffer.clone())?;
+
+ match header.algorithm {
+ BloomFilterAlgorithm::BLOCK(_) => {
+ // this match exists to future proof the singleton algorithm
enum
+ }
+ }
+ match header.compression {
+ BloomFilterCompression::UNCOMPRESSED(_) => {
+ // this match exists to future proof the singleton compression
enum
+ }
+ }
+ match header.hash {
+ BloomFilterHash::XXHASH(_) => {
+ // this match exists to future proof the singleton hash enum
+ }
+ }
+
+ let bitset = match column_metadata.bloom_filter_length() {
+ Some(_) => buffer.slice((bitset_offset as usize - offset)..),
+ None => {
+ let bitset_length: usize =
header.num_bytes.try_into().map_err(|_| {
+ ParquetError::General("Bloom filter length is
invalid".to_string())
+ })?;
+ self.input
+ .0
+ .get_bytes(
+ bitset_offset as usize..bitset_offset as usize +
bitset_length,
+ )
+ .await?
+ }
+ };
+ Ok(Some(Sbbf::new(&bitset)))
+ }
+
/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
@@ -1540,4 +1606,75 @@ mod tests {
assert_ne!(1024, file_rows);
assert_eq!(stream.batch_size, file_rows);
}
+
+ #[tokio::test]
+ async fn test_get_row_group_column_bloom_filter_without_length() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+ test_get_row_group_column_bloom_filter(data, false).await;
+ }
+
+ #[tokio::test]
+ async fn test_get_row_group_column_bloom_filter_with_length() {
+ // convert to new parquet file with bloom_filter_length
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+ let metadata = parse_metadata(&data).unwrap();
+ let metadata = Arc::new(metadata);
+ let async_reader = TestReader {
+ data: data.clone(),
+ metadata: metadata.clone(),
+ requests: Default::default(),
+ };
+ let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+ .await
+ .unwrap();
+ let schema = builder.schema().clone();
+ let stream = builder.build().unwrap();
+ let batches = stream.try_collect::<Vec<_>>().await.unwrap();
+
+ let mut parquet_data = Vec::new();
+ let props = WriterProperties::builder()
+ .set_bloom_filter_enabled(true)
+ .build();
+ let mut writer =
+ ArrowWriter::try_new(&mut parquet_data, schema,
Some(props)).unwrap();
+ for batch in batches {
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ // test the new parquet file
+ test_get_row_group_column_bloom_filter(parquet_data.into(),
true).await;
+ }
+
+ async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length:
bool) {
+ let metadata = parse_metadata(&data).unwrap();
+ let metadata = Arc::new(metadata);
+
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ let column = row_group.column(0);
+ assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
+ let async_reader = TestReader {
+ data: data.clone(),
+ metadata: metadata.clone(),
+ requests: Default::default(),
+ };
+
+ let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+ .await
+ .unwrap();
+
+ let sbbf = builder
+ .get_row_group_column_bloom_filter(0, 0)
+ .await
+ .unwrap()
+ .unwrap();
+ assert!(sbbf.check(&"Hello"));
+ assert!(!sbbf.check(&"Hello_Not_Exists"));
+ }
}
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index a3807eb370..e98aee9fd2 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -132,11 +132,11 @@ impl std::ops::IndexMut<usize> for Block {
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);
-const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
+pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
/// given an initial offset, and a byte buffer, try to read out a bloom filter
header and return
/// both the header and the offset after it (for bitset).
-fn chunk_read_bloom_filter_header_and_offset(
+pub(crate) fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
@@ -147,7 +147,7 @@ fn chunk_read_bloom_filter_header_and_offset(
/// given a [Bytes] buffer, try to read out a bloom filter header and return
both the header and
/// length of the header.
#[inline]
-fn read_bloom_filter_header_and_length(
+pub(crate) fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
@@ -199,7 +199,7 @@ impl Sbbf {
Self::new(&bitset)
}
- fn new(bitset: &[u8]) -> Self {
+ pub(crate) fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {