This is an automated email from the ASF dual-hosted git repository.

sdd pushed a commit to branch feature/arrow-reader-delete-file-retrieval
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git

commit 50d552a03c78615d7e8ba5190c2b566f6b487b03
Author: Scott Donnelly <sc...@donnel.ly>
AuthorDate: Wed Jan 15 06:58:40 2025 +0000

    feat(wip): arrow reader delete file read
---
 crates/iceberg/src/arrow/reader.rs | 84 ++++++++++++++++++++++++++++++++++++--
 crates/iceberg/src/deletes.rs      | 25 ++++++++++++
 crates/iceberg/src/lib.rs          |  1 +
 crates/iceberg/src/scan.rs         | 13 ++++++
 4 files changed, 120 insertions(+), 3 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index b4e15821..91850c80 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -32,7 +32,8 @@ use arrow_string::like::starts_with;
 use bytes::Bytes;
 use fnv::FnvHashSet;
 use futures::future::BoxFuture;
-use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
+use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt, 
SinkExt};
+use itertools::Itertools;
 use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, 
RowFilter, RowSelection};
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, 
PARQUET_FIELD_ID_META_KEY};
@@ -47,10 +48,12 @@ use 
crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
 use 
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
 use crate::expr::{BoundPredicate, BoundReference};
 use crate::io::{FileIO, FileMetadata, FileRead};
-use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
-use crate::spec::{Datum, PrimitiveType, Schema};
+use crate::scan::{ArrowRecordBatchStream, FileScanTask, 
FileScanTaskDeleteFile, FileScanTaskStream};
+use crate::spec::{DataContentType, Datum, PrimitiveType, Schema};
 use crate::utils::available_parallelism;
 use crate::{Error, ErrorKind};
+use roaring::RoaringBitmap;
+use crate::deletes::Deletes;
 
 /// Builder to create ArrowReader
 pub struct ArrowReaderBuilder {
@@ -257,6 +260,81 @@ impl ArrowReader {
         Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
     }
 
+    // retrieve all delete files concurrently from FileIO and parse them
+    // into `Deletes` objects
+    async fn get_deletes(
+        delete_file_entries: Vec<FileScanTaskDeleteFile>,
+        file_io: FileIO,
+        concurrency_limit_data_files: usize,
+    ) -> Result<Vec<Deletes>> {
+        if delete_file_entries.is_empty() {
+            return Ok(vec![]);
+        }
+
+        futures::stream::iter(delete_file_entries.into_iter().map(Ok))
+            .map_ok(|entry| {
+                let file_io = file_io.clone();
+                async move {
+                    let FileScanTaskDeleteFile {
+                        file_path,
+                        file_type,
+                        ..
+                    } = entry;
+
+                    let record_batch_stream = 
Self::create_parquet_record_batch_stream_builder(
+                        &file_path,
+                        file_io,
+                        false,
+                    )
+                    .await?
+                    .build()?
+                    .map_err(|err| Error::new(ErrorKind::DataInvalid, 
err.to_string()).with_source(err))
+                    .boxed();
+
+                    match file_type {
+                        DataContentType::PositionDeletes => 
Self::parse_positional_delete_file(record_batch_stream).await,
+                        DataContentType::EqualityDeletes => 
Self::parse_equality_delete_file(record_batch_stream).await,
+                        _ => Err(Error::new(
+                            ErrorKind::Unexpected,
+                            "Expected equality or positional delete",
+                        )),
+                    }
+                }
+            })
+            .try_buffer_unordered(concurrency_limit_data_files)
+            .try_collect()
+            .await
+    }
+
+    async fn parse_positional_delete_file(_record_batches: 
ArrowRecordBatchStream) -> Result<Deletes> {
+        todo!()
+    }
+
+    async fn parse_equality_delete_file(_record_batches: 
ArrowRecordBatchStream) -> Result<Deletes> {
+        todo!()
+    }
+
+    async fn create_parquet_record_batch_stream_builder(
+        data_file_path: &str,
+        file_io: FileIO,
+        should_load_page_index: bool,
+    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead 
+ Sized>>> {
+        // Get the metadata for the Parquet file we need to read and build
+        // a reader for the data within
+        let parquet_file = file_io.new_input(data_file_path)?;
+        let (parquet_metadata, parquet_reader) =
+            try_join!(parquet_file.metadata(), parquet_file.reader())?;
+        let parquet_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
+
+        // Start creating the record batch stream, which wraps the parquet 
file reader
+        let record_batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new_with_options(
+            parquet_file_reader,
+            ArrowReaderOptions::new().with_page_index(should_load_page_index),
+        )
+            .await?;
+        Ok(record_batch_stream_builder)
+    }
+
     fn build_field_id_set_and_map(
         parquet_schema: &SchemaDescriptor,
         predicate: &BoundPredicate,
diff --git a/crates/iceberg/src/deletes.rs b/crates/iceberg/src/deletes.rs
new file mode 100644
index 00000000..268a2131
--- /dev/null
+++ b/crates/iceberg/src/deletes.rs
@@ -0,0 +1,25 @@
+use roaring::RoaringBitmap;
+use arrow_array::RecordBatch;
+
+// Represents a parsed Delete file that can be safely stored
+// in the Object Cache.
+pub(crate) enum Deletes {
+    // Positional delete files are parsed into a map of
+    // filename to a sorted list of row indices.
+
+    // TODO: Ignoring the stored rows that are present in
+    //   positional deletes for now. I think they only used for statistics?
+    Vector(RoaringBitmap),
+
+    // Equality delete files are initially parsed solely as an
+    // unprocessed list of `RecordBatch`es from the equality
+    // delete files.
+    // I don't think we can do better than this by
+    // storing a Predicate (because the equality deletes use the
+    // field_id rather than the field name, so if we keep this as
+    // a Predicate then a field name change would break it).
+    // Similarly, I don't think we can store this as a BoundPredicate
+    // as the column order could be different across different data
+    // files and so the accessor in the bound predicate could be invalid).
+    Equality(Vec<RecordBatch>),
+}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index eaecfea6..af4d9448 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -86,3 +86,4 @@ mod utils;
 pub mod writer;
 
 mod puffin;
+mod deletes;
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 7a100b34..4e7a380c 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -933,6 +933,19 @@ pub struct FileScanTask {
     pub predicate: Option<BoundPredicate>,
 }
 
+/// A task to scan part of file.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct FileScanTaskDeleteFile {
+    /// The delete file path
+    pub file_path: String,
+
+    /// delete file type
+    pub file_type: DataContentType,
+
+    /// partition id
+    pub partition_spec_id: i32,
+}
+
 impl FileScanTask {
     /// Returns the data file path of this file scan task.
     pub fn data_file_path(&self) -> &str {

Reply via email to