This is an automated email from the ASF dual-hosted git repository. sdd pushed a commit to branch feature/arrow-reader-delete-file-retrieval in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
commit 50d552a03c78615d7e8ba5190c2b566f6b487b03 Author: Scott Donnelly <sc...@donnel.ly> AuthorDate: Wed Jan 15 06:58:40 2025 +0000 feat(wip): arrow reader delete file read --- crates/iceberg/src/arrow/reader.rs | 84 ++++++++++++++++++++++++++++++++++++-- crates/iceberg/src/deletes.rs | 25 ++++++++++++ crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/scan.rs | 13 ++++++ 4 files changed, 120 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b4e15821..91850c80 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -32,7 +32,8 @@ use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; use futures::future::BoxFuture; -use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt, SinkExt}; +use itertools::Itertools; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; @@ -47,10 +48,12 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, PrimitiveType, Schema}; +use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; +use crate::spec::{DataContentType, Datum, PrimitiveType, Schema}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +use roaring::RoaringBitmap; +use crate::deletes::Deletes; /// Builder to create ArrowReader pub struct ArrowReaderBuilder { @@ -257,6 +260,81 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } + // retrieve all delete files concurrently from FileIO and parse them + // into `Deletes` objects + async fn get_deletes( + delete_file_entries: Vec<FileScanTaskDeleteFile>, + file_io: FileIO, + concurrency_limit_data_files: usize, + ) -> Result<Vec<Deletes>> { + if delete_file_entries.is_empty() { + return Ok(vec![]); + } + + futures::stream::iter(delete_file_entries.into_iter().map(Ok)) + .map_ok(|entry| { + let file_io = file_io.clone(); + async move { + let FileScanTaskDeleteFile { + file_path, + file_type, + .. + } = entry; + + let record_batch_stream = Self::create_parquet_record_batch_stream_builder( + &file_path, + file_io, + false, + ) + .await? + .build()? + .map_err(|err| Error::new(ErrorKind::DataInvalid, err.to_string()).with_source(err)) + .boxed(); + + match file_type { + DataContentType::PositionDeletes => Self::parse_positional_delete_file(record_batch_stream).await, + DataContentType::EqualityDeletes => Self::parse_equality_delete_file(record_batch_stream).await, + _ => Err(Error::new( + ErrorKind::Unexpected, + "Expected equality or positional delete", + )), + } + } + }) + .try_buffer_unordered(concurrency_limit_data_files) + .try_collect() + .await + } + + async fn parse_positional_delete_file(_record_batches: ArrowRecordBatchStream) -> Result<Deletes> { + todo!() + } + + async fn parse_equality_delete_file(_record_batches: ArrowRecordBatchStream) -> Result<Deletes> { + todo!() + } + + async fn create_parquet_record_batch_stream_builder( + data_file_path: &str, + file_io: FileIO, + should_load_page_index: bool, + ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> { + // Get the metadata for the Parquet file we need to read and build + // a reader for the data within + let parquet_file = file_io.new_input(data_file_path)?; + let (parquet_metadata, parquet_reader) = + try_join!(parquet_file.metadata(), parquet_file.reader())?; + let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + + // Start creating the record batch stream, which wraps the parquet file reader + let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( + parquet_file_reader, + ArrowReaderOptions::new().with_page_index(should_load_page_index), + ) + .await?; + Ok(record_batch_stream_builder) + } + fn build_field_id_set_and_map( parquet_schema: &SchemaDescriptor, predicate: &BoundPredicate, diff --git a/crates/iceberg/src/deletes.rs b/crates/iceberg/src/deletes.rs new file mode 100644 index 00000000..268a2131 --- /dev/null +++ b/crates/iceberg/src/deletes.rs @@ -0,0 +1,25 @@ +use roaring::RoaringBitmap; +use arrow_array::RecordBatch; + +// Represents a parsed Delete file that can be safely stored +// in the Object Cache. +pub(crate) enum Deletes { + // Positional delete files are parsed into a map of + // filename to a sorted list of row indices. + + // TODO: Ignoring the stored rows that are present in + // positional deletes for now. I think they only used for statistics? + Vector(RoaringBitmap), + + // Equality delete files are initially parsed solely as an + // unprocessed list of `RecordBatch`es from the equality + // delete files. + // I don't think we can do better than this by + // storing a Predicate (because the equality deletes use the + // field_id rather than the field name, so if we keep this as + // a Predicate then a field name change would break it). + // Similarly, I don't think we can store this as a BoundPredicate + // as the column order could be different across different data + // files and so the accessor in the bound predicate could be invalid). + Equality(Vec<RecordBatch>), +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index eaecfea6..af4d9448 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -86,3 +86,4 @@ mod utils; pub mod writer; mod puffin; +mod deletes; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7a100b34..4e7a380c 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -933,6 +933,19 @@ pub struct FileScanTask { pub predicate: Option<BoundPredicate>, } +/// A task to scan part of file. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FileScanTaskDeleteFile { + /// The delete file path + pub file_path: String, + + /// delete file type + pub file_type: DataContentType, + + /// partition id + pub partition_spec_id: i32, +} + impl FileScanTask { /// Returns the data file path of this file scan task. pub fn data_file_path(&self) -> &str {