This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a20771d  feat: add streaming table and file group APIs (#508)
a20771d is described below

commit a20771d1dafe4fa9b0852360ab69311f425245cf
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 4 15:15:39 2026 -0600

    feat: add streaming table and file group APIs (#508)
---
 crates/core/src/config/read.rs        |  11 +
 crates/core/src/expr/filter.rs        |   8 +-
 crates/core/src/file_group/reader.rs  | 634 ++++++++++++++++++++++++++++++----
 crates/core/src/storage/mod.rs        | 108 ++++++
 crates/core/src/table/mod.rs          | 152 ++++++++
 crates/core/src/table/partition.rs    |   2 +-
 crates/core/src/table/read_options.rs | 211 +++++++++++
 crates/core/tests/table_read_tests.rs | 426 +++++++++++++++++++++++
 8 files changed, 1475 insertions(+), 77 deletions(-)

diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 04c3296..a5acbc5 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -60,6 +60,10 @@ pub enum HudiReadConfig {
     /// When set to true, only [BaseFile]s will be read for optimized reads.
     /// This is only applicable to Merge-On-Read (MOR) tables.
     UseReadOptimizedMode,
+
+    /// Target number of rows per batch for streaming reads.
+    /// This controls the batch size when using streaming APIs.
+    StreamBatchSize,
 }
 
 impl AsRef<str> for HudiReadConfig {
@@ -70,6 +74,7 @@ impl AsRef<str> for HudiReadConfig {
             Self::InputPartitions => "hoodie.read.input.partitions",
             Self::ListingParallelism => "hoodie.read.listing.parallelism",
             Self::UseReadOptimizedMode => 
"hoodie.read.use.read_optimized.mode",
+            Self::StreamBatchSize => "hoodie.read.stream.batch_size",
         }
     }
 }
@@ -88,6 +93,7 @@ impl ConfigParser for HudiReadConfig {
             HudiReadConfig::InputPartitions => 
Some(HudiConfigValue::UInteger(0usize)),
             HudiReadConfig::ListingParallelism => 
Some(HudiConfigValue::UInteger(10usize)),
             HudiReadConfig::UseReadOptimizedMode => 
Some(HudiConfigValue::Boolean(false)),
+            HudiReadConfig::StreamBatchSize => 
Some(HudiConfigValue::UInteger(1024usize)),
             _ => None,
         }
     }
@@ -120,6 +126,11 @@ impl ConfigParser for HudiReadConfig {
                     bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Boolean),
+            Self::StreamBatchSize => get_result
+                .and_then(|v| {
+                    usize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
+                })
+                .map(HudiConfigValue::UInteger),
         }
     }
 }
diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 542d45f..060841f 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -193,7 +193,7 @@ impl SchemableFilter {
         ))
     }
 
-    pub fn apply_comparsion(&self, value: &dyn Datum) -> Result<BooleanArray> {
+    pub fn apply_comparison(&self, value: &dyn Datum) -> Result<BooleanArray> {
         match self.operator {
             ExprOperator::Eq => eq(value, &self.value),
             ExprOperator::Ne => neq(value, &self.value),
@@ -289,7 +289,7 @@ mod tests {
         let schemable = SchemableFilter::try_from((eq_filter, &schema))?;
 
         let test_array = StringArray::from(vec!["test", "other", "test"]);
-        let result = schemable.apply_comparsion(&test_array)?;
+        let result = schemable.apply_comparison(&test_array)?;
         assert_eq!(result, BooleanArray::from(vec![true, false, true]));
 
         // Test integer greater than comparison
@@ -301,7 +301,7 @@ mod tests {
         let schemable = SchemableFilter::try_from((gt_filter, &schema))?;
 
         let test_array = Int64Array::from(vec![40, 50, 60]);
-        let result = schemable.apply_comparsion(&test_array)?;
+        let result = schemable.apply_comparison(&test_array)?;
         assert_eq!(result, BooleanArray::from(vec![false, false, true]));
 
         Ok(())
@@ -329,7 +329,7 @@ mod tests {
             };
 
             let schemable = SchemableFilter::try_from((filter, &schema))?;
-            let result = schemable.apply_comparsion(&test_array)?;
+            let result = schemable.apply_comparison(&test_array)?;
             assert_eq!(
                 result,
                 BooleanArray::from(expected),
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index defaf7d..005b48f 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -32,13 +32,15 @@ use crate::merge::record_merger::RecordMerger;
 use crate::metadata::merger::FilesPartitionMerger;
 use crate::metadata::meta_field::MetaField;
 use crate::metadata::table_record::FilesPartitionRecord;
-use crate::storage::Storage;
+use crate::storage::{ParquetReadOptions, Storage};
+use crate::table::ReadOptions;
 use crate::table::builder::OptionResolver;
 use crate::timeline::selector::InstantRange;
 use arrow::compute::and;
 use arrow::compute::filter_record_batch;
 use arrow_array::{BooleanArray, RecordBatch};
-use futures::TryFutureExt;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryFutureExt};
 use std::collections::HashMap;
 use std::convert::TryFrom;
 use std::sync::Arc;
@@ -108,64 +110,6 @@ impl FileGroupReader {
             })
     }
 
-    fn create_filtering_mask_for_base_file_records(
-        &self,
-        records: &RecordBatch,
-    ) -> Result<Option<BooleanArray>> {
-        let populates_meta_fields: bool = self
-            .hudi_configs
-            .get_or_default(HudiTableConfig::PopulatesMetaFields)
-            .into();
-        if !populates_meta_fields {
-            // If meta fields are not populated, commit time filtering is not 
applicable.
-            return Ok(None);
-        }
-
-        let mut and_filters: Vec<SchemableFilter> = Vec::new();
-        let schema = MetaField::schema();
-        if let Some(start) = self
-            .hudi_configs
-            .try_get(HudiReadConfig::FileGroupStartTimestamp)
-            .map(|v| -> String { v.into() })
-        {
-            let filter: Filter =
-                Filter::try_from((MetaField::CommitTime.as_ref(), ">", 
start.as_str()))?;
-            let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
-            and_filters.push(filter);
-        } else {
-            // If start timestamp is not provided, the query is snapshot or 
time-travel, so
-            // commit time filtering is not needed as the base file being read 
is already
-            // filtered and selected by the timeline.
-            return Ok(None);
-        }
-
-        if let Some(end) = self
-            .hudi_configs
-            .try_get(HudiReadConfig::FileGroupEndTimestamp)
-            .map(|v| -> String { v.into() })
-        {
-            let filter = Filter::try_from((MetaField::CommitTime.as_ref(), 
"<=", end.as_str()))?;
-            let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
-            and_filters.push(filter);
-        }
-
-        if and_filters.is_empty() {
-            return Ok(None);
-        }
-
-        let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
-        for filter in &and_filters {
-            let col_name = filter.field.name().as_str();
-            let col_values = records
-                .column_by_name(col_name)
-                .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} 
not found")))?;
-
-            let comparison = filter.apply_comparsion(col_values)?;
-            mask = and(&mask, &comparison)?;
-        }
-        Ok(Some(mask))
-    }
-
     /// Reads the data from the base file at the given relative path.
     ///
     /// # Arguments
@@ -183,12 +127,7 @@ impl FileGroupReader {
             .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{relative_path}: {e:?}")))
             .await?;
 
-        if let Some(mask) = 
self.create_filtering_mask_for_base_file_records(&records)? {
-            filter_record_batch(&records, &mask)
-                .map_err(|e| ReadFileSliceError(format!("Failed to filter 
records: {e:?}")))
-        } else {
-            Ok(records)
-        }
+        apply_commit_time_filter(&self.hudi_configs, records)
     }
 
     /// Same as [FileGroupReader::read_file_slice_by_base_file_path], but 
blocking.
@@ -325,6 +264,173 @@ impl FileGroupReader {
             .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
     }
 
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of [FileGroupReader::read_file_slice].
+    /// It returns a stream that yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch. This limitation exists 
because
+    /// streaming merge of base files with log files is not yet implemented.
+    ///
+    /// # Limitations
+    ///
+    /// - The `projection` and `row_predicate` fields in [ReadOptions] are not 
yet
+    ///   implemented for streaming reads. Only `batch_size` is currently 
supported.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice_stream(&file_slice, 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        self.read_file_slice_from_paths_stream(&base_file_path, 
log_file_paths, options)
+            .await
+    }
+
+    /// Reads a file slice from paths as a stream of record batches.
+    ///
+    /// This is the streaming version of 
[FileGroupReader::read_file_slice_from_paths].
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the base file.
+    /// * `log_file_paths` - Iterator of relative paths to log files.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    pub async fn read_file_slice_from_paths_stream<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+
+        if use_read_optimized {
+            return self.read_base_file_stream(base_file_path, options).await;
+        }
+
+        let log_file_paths: Vec<String> = log_file_paths
+            .into_iter()
+            .map(|s| s.as_ref().to_string())
+            .collect();
+
+        if log_file_paths.is_empty() {
+            self.read_base_file_stream(base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths(base_file_path, log_file_paths)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }
+    }
+
+    /// Reads a base file as a stream of record batches.
+    ///
+    /// # Limitations
+    ///
+    /// Currently only `batch_size` from [ReadOptions] is used. The 
`projection` and
+    /// `row_predicate` fields are not yet implemented.
+    async fn read_base_file_stream(
+        &self,
+        relative_path: &str,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        use crate::config::table::BaseFileFormatValue;
+
+        // Validate base file format is parquet
+        let base_file_format: String = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::BaseFileFormat)
+            .into();
+        if 
!base_file_format.eq_ignore_ascii_case(BaseFileFormatValue::Parquet.as_ref()) {
+            return Err(ReadFileSliceError(format!(
+                "Streaming read only supports parquet format, got: 
{base_file_format}"
+            )));
+        }
+
+        let default_batch_size: usize = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::StreamBatchSize)
+            .into();
+        let batch_size = options.batch_size.unwrap_or(default_batch_size);
+        let parquet_options = 
ParquetReadOptions::new().with_batch_size(batch_size);
+
+        let hudi_configs = self.hudi_configs.clone();
+        let path = relative_path.to_string();
+
+        let parquet_stream = self
+            .storage
+            .get_parquet_file_stream(&path, parquet_options)
+            .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{path}: {e:?}")))
+            .await?;
+
+        // Apply the same filtering logic as read_file_slice_by_base_file_path
+        let stream = parquet_stream.into_stream().filter_map(move |result| {
+            let hudi_configs = hudi_configs.clone();
+            async move {
+                match result {
+                    Err(e) => Some(Err(ReadFileSliceError(format!(
+                        "Failed to read batch: {e:?}"
+                    )))),
+                    Ok(batch) => match apply_commit_time_filter(&hudi_configs, 
batch) {
+                        Err(e) => Some(Err(e)),
+                        Ok(filtered) if filtered.num_rows() > 0 => 
Some(Ok(filtered)),
+                        Ok(_) => None,
+                    },
+                }
+            }
+        });
+
+        Ok(Box::pin(stream))
+    }
+
     // 
=========================================================================
     // Metadata Table File Slice Reading
     // 
=========================================================================
@@ -425,6 +531,70 @@ impl FileGroupReader {
     }
 }
 
+/// Creates a commit time filtering mask based on the provided configs.
+///
+/// Returns `None` if no filtering is needed (meta fields disabled or no start 
timestamp).
+fn create_commit_time_filter_mask(
+    hudi_configs: &HudiConfigs,
+    batch: &RecordBatch,
+) -> Result<Option<BooleanArray>> {
+    let populates_meta_fields: bool = hudi_configs
+        .get_or_default(HudiTableConfig::PopulatesMetaFields)
+        .into();
+    if !populates_meta_fields {
+        return Ok(None);
+    }
+
+    let start_ts = hudi_configs
+        .try_get(HudiReadConfig::FileGroupStartTimestamp)
+        .map(|v| -> String { v.into() });
+    if start_ts.is_none() {
+        // If start timestamp is not provided, the query is snapshot or 
time-travel
+        return Ok(None);
+    }
+
+    let mut and_filters: Vec<SchemableFilter> = Vec::new();
+    let schema = MetaField::schema();
+
+    if let Some(start) = start_ts {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), ">", 
start.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if let Some(end) = hudi_configs
+        .try_get(HudiReadConfig::FileGroupEndTimestamp)
+        .map(|v| -> String { v.into() })
+    {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=", 
end.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if and_filters.is_empty() {
+        return Ok(None);
+    }
+
+    let mut mask = BooleanArray::from(vec![true; batch.num_rows()]);
+    for filter in &and_filters {
+        let col_name = filter.field.name().as_str();
+        let col_values = batch
+            .column_by_name(col_name)
+            .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not 
found")))?;
+        let comparison = filter.apply_comparison(col_values)?;
+        mask = and(&mask, &comparison)?;
+    }
+
+    Ok(Some(mask))
+}
+
+/// Apply commit time filtering to a record batch.
+fn apply_commit_time_filter(hudi_configs: &HudiConfigs, batch: RecordBatch) -> 
Result<RecordBatch> {
+    match create_commit_time_filter_mask(hudi_configs, &batch)? {
+        Some(mask) => filter_record_batch(&batch, &mask)
+            .map_err(|e| ReadFileSliceError(format!("Failed to filter records: 
{e:?}"))),
+        None => Ok(batch),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -533,7 +703,7 @@ mod tests {
     }
 
     #[test]
-    fn test_create_filtering_mask_for_base_file_records() -> Result<()> {
+    fn test_create_commit_time_filter_mask() -> Result<()> {
         let base_uri = get_base_uri_with_valid_props_minimum();
         let records = create_test_record_batch()?;
 
@@ -545,12 +715,12 @@ mod tests {
                 (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
             ],
         )?;
-        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(mask, None, "Commit time filtering should not be needed");
 
         // Test case 2: No commit time filtering options
         let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
-        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(mask, None);
 
         // Test case 3: Filtering commit time > '2'
@@ -558,7 +728,7 @@ mod tests {
             &base_uri,
             [(HudiReadConfig::FileGroupStartTimestamp, "2")],
         )?;
-        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(
             mask,
             Some(BooleanArray::from(vec![false, false, true, true, true])),
@@ -570,7 +740,7 @@ mod tests {
             &base_uri,
             [(HudiReadConfig::FileGroupEndTimestamp, "4")],
         )?;
-        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(mask, None, "Commit time filtering should not be needed");
 
         // Test case 5: Filtering commit time > '2' and <= '4'
@@ -581,7 +751,7 @@ mod tests {
                 (HudiReadConfig::FileGroupEndTimestamp, "4"),
             ],
         )?;
-        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(
             mask,
             Some(BooleanArray::from(vec![false, false, true, true, false])),
@@ -733,6 +903,326 @@ mod tests {
         Ok(())
     }
 
+    // 
=========================================================================
+    // Streaming API Tests
+    // 
=========================================================================
+
+    /// Helper to create a FileGroupReader without using block_on (safe for 
async tests).
+    fn create_test_reader(base_uri: &str) -> Result<FileGroupReader> {
+        let hudi_configs = 
Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_uri)]));
+        
FileGroupReader::new_with_configs_and_overwriting_options(hudi_configs, 
empty_options())
+    }
+
+    /// Helper to create a FileGroupReader with read-optimized mode.
+    fn create_test_reader_read_optimized(base_uri: &str) -> 
Result<FileGroupReader> {
+        let hudi_configs = 
Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_uri)]));
+        FileGroupReader::new_with_configs_and_overwriting_options(
+            hudi_configs,
+            [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
+        )
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_base_file_only() -> Result<()> {
+        use futures::StreamExt;
+
+        let base_uri = get_base_uri_with_valid_props_minimum();
+        let reader = create_test_reader(&base_uri)?;
+
+        let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
+        let file_slice = FileSlice::new(base_file, String::new());
+
+        let options = ReadOptions::default();
+        let result = reader.read_file_slice_stream(&file_slice, 
&options).await;
+
+        match result {
+            Ok(mut stream) => {
+                let mut batches = Vec::new();
+                while let Some(batch_result) = stream.next().await {
+                    batches.push(batch_result?);
+                }
+                // Should have read some batches
+                assert!(!batches.is_empty(), "Should produce at least one 
batch");
+                let total_rows: usize = batches.iter().map(|b| 
b.num_rows()).sum();
+                assert!(total_rows > 0, "Should read at least one row");
+            }
+            Err(e) => {
+                // Expected for missing test data
+                let error_msg = e.to_string();
+                assert!(
+                    error_msg.contains("Failed to read path")
+                        || error_msg.contains("not found")
+                        || error_msg.contains("No such file"),
+                    "Expected file not found error, got: {error_msg}"
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_from_paths_stream_base_file_only() -> 
Result<()> {
+        use futures::StreamExt;
+
+        let base_uri = get_base_uri_with_valid_props_minimum();
+        let reader = create_test_reader(&base_uri)?;
+
+        let base_file_path = TEST_SAMPLE_BASE_FILE;
+        let log_file_paths: Vec<&str> = vec![];
+        let options = ReadOptions::default();
+
+        let result = reader
+            .read_file_slice_from_paths_stream(base_file_path, log_file_paths, 
&options)
+            .await;
+
+        match result {
+            Ok(mut stream) => {
+                let mut batches = Vec::new();
+                while let Some(batch_result) = stream.next().await {
+                    batches.push(batch_result?);
+                }
+                assert!(!batches.is_empty(), "Should produce at least one 
batch");
+            }
+            Err(e) => {
+                // Expected for missing test data
+                let error_msg = e.to_string();
+                assert!(
+                    error_msg.contains("Failed to read path") || 
error_msg.contains("not found"),
+                    "Expected file not found error, got: {error_msg}"
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_from_paths_stream_read_optimized_mode() -> 
Result<()> {
+        use futures::StreamExt;
+
+        let base_uri = get_base_uri_with_valid_props_minimum();
+        let reader = create_test_reader_read_optimized(&base_uri)?;
+
+        let base_file_path = TEST_SAMPLE_BASE_FILE;
+        // Even with log files, read-optimized mode should ignore them
+        let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
+        let options = ReadOptions::default();
+
+        let result = reader
+            .read_file_slice_from_paths_stream(base_file_path, log_file_paths, 
&options)
+            .await;
+
+        match result {
+            Ok(mut stream) => {
+                let mut batches = Vec::new();
+                while let Some(batch_result) = stream.next().await {
+                    batches.push(batch_result?);
+                }
+                // In read-optimized mode, log files are ignored - should 
still work
+                assert!(
+                    !batches.is_empty(),
+                    "Should produce batches in read-optimized mode"
+                );
+            }
+            Err(e) => {
+                // Expected for missing test data
+                let error_msg = e.to_string();
+                assert!(
+                    error_msg.contains("Failed to read path")
+                        || error_msg.contains("not found")
+                        || error_msg.contains("No such file"),
+                    "Expected file not found error, got: {error_msg}"
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_from_paths_stream_with_log_files() -> 
Result<()> {
+        use futures::StreamExt;
+
+        let base_uri = get_base_uri_with_valid_props_minimum();
+        let reader = create_test_reader(&base_uri)?;
+
+        let base_file_path = TEST_SAMPLE_BASE_FILE;
+        let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
+        let options = ReadOptions::default();
+
+        let result = reader
+            .read_file_slice_from_paths_stream(base_file_path, log_file_paths, 
&options)
+            .await;
+
+        match result {
+            Ok(mut stream) => {
+                // With log files, falls back to collect+merge and yields 
single batch
+                let mut batches = Vec::new();
+                while let Some(batch_result) = stream.next().await {
+                    batches.push(batch_result?);
+                }
+                // Should have exactly one batch (the merged result from 
fallback path)
+                assert_eq!(
+                    batches.len(),
+                    1,
+                    "Should produce exactly one batch in fallback mode"
+                );
+            }
+            Err(e) => {
+                // Expected for missing test data
+                let error_msg = e.to_string();
+                assert!(
+                    error_msg.contains("Failed to read path")
+                        || error_msg.contains("not found")
+                        || error_msg.contains("No such file"),
+                    "Expected file not found error, got: {error_msg}"
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+        use futures::StreamExt;
+
+        let base_uri = get_base_uri_with_valid_props_minimum();
+        let reader = create_test_reader(&base_uri)?;
+
+        let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
+        let file_slice = FileSlice::new(base_file, String::new());
+
+        // Use very small batch size
+        let options = ReadOptions {
+            partition_filters: vec![],
+            projection: None,
+            row_predicate: None,
+            batch_size: Some(1),
+            as_of_timestamp: None,
+        };
+
+        let result = reader.read_file_slice_stream(&file_slice, 
&options).await;
+
+        match result {
+            Ok(mut stream) => {
+                let mut batches = Vec::new();
+                while let Some(batch_result) = stream.next().await {
+                    batches.push(batch_result?);
+                }
+                // With small batch size, should get multiple batches
+                let total_rows: usize = batches.iter().map(|b| 
b.num_rows()).sum();
+                assert!(total_rows > 0, "Should read at least one row");
+            }
+            Err(e) => {
+                // Expected for missing test data
+                let error_msg = e.to_string();
+                assert!(
+                    error_msg.contains("Failed to read path")
+                        || error_msg.contains("not found")
+                        || error_msg.contains("No such file"),
+                    "Expected file not found error, got: {error_msg}"
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_error_on_invalid_file() -> Result<()> 
{
+        let base_uri = get_base_uri_with_valid_props_minimum();
+        let reader = create_test_reader(&base_uri)?;
+
+        // Use a valid file name format but pointing to a non-existent file
+        let base_file = BaseFile::from_str(
+            
"00000000-0000-0000-0000-000000000000-0_0-0-0_00000000000000000.parquet",
+        )?;
+        let file_slice = FileSlice::new(base_file, String::new());
+
+        let options = ReadOptions::default();
+        let result = reader.read_file_slice_stream(&file_slice, 
&options).await;
+
+        // Should return error for non-existent file
+        match result {
+            Ok(_) => panic!("Should return error for non-existent file"),
+            Err(e) => {
+                let error_msg = e.to_string();
+                assert!(
+                    error_msg.contains("Failed to read path")
+                        || error_msg.contains("not found")
+                        || error_msg.contains("No such file")
+                        || error_msg.contains("Object at location"),
+                    "Expected file not found error, got: {error_msg}"
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Helper to create a FileGroupReader with commit time filtering options.
+    fn create_test_reader_with_commit_time_filter(base_uri: &str) -> 
Result<FileGroupReader> {
+        let hudi_configs = 
Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_uri)]));
+        FileGroupReader::new_with_configs_and_overwriting_options(
+            hudi_configs,
+            [
+                (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
+                (HudiReadConfig::FileGroupEndTimestamp.as_ref(), "4"),
+            ],
+        )
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_with_commit_time_filtering() -> 
Result<()> {
+        use futures::StreamExt;
+
+        let base_uri = get_base_uri_with_valid_props_minimum();
+
+        // Create reader with commit time filtering options
+        let reader = create_test_reader_with_commit_time_filter(&base_uri)?;
+
+        let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
+        let file_slice = FileSlice::new(base_file, String::new());
+        let options = ReadOptions::default();
+
+        let result = reader.read_file_slice_stream(&file_slice, 
&options).await;
+
+        match result {
+            Ok(mut stream) => {
+                // Collect all batches and verify commit time filtering was 
applied
+                let mut batches = Vec::new();
+                while let Some(batch_result) = stream.next().await {
+                    batches.push(batch_result?);
+                }
+
+                // Verify streaming with commit time filtering completed 
successfully.
+                // The commit time filtering is applied via 
apply_commit_time_filter
+                // in read_base_file_stream for each batch.
+                let total_rows: usize = batches.iter().map(|b| 
b.num_rows()).sum();
+                // Just verify we can process all batches - exact count 
depends on test data
+                assert!(
+                    batches.is_empty() || total_rows > 0,
+                    "Non-empty batches should have rows"
+                );
+            }
+            Err(e) => {
+                // Expected for missing test data - verify error is 
file-related
+                let error_msg = e.to_string();
+                assert!(
+                    error_msg.contains("Failed to read path")
+                        || error_msg.contains("not found")
+                        || error_msg.contains("No such file")
+                        || error_msg.contains("Object at location"),
+                    "Expected file not found error, got: {error_msg}"
+                );
+            }
+        }
+
+        Ok(())
+    }
+
     // 
=========================================================================
     // Metadata Table File Slice Reading Tests
     // 
=========================================================================
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 5f4b019..14af978 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -24,9 +24,11 @@ use std::sync::Arc;
 
 use arrow::compute::concat_batches;
 use arrow::record_batch::RecordBatch;
+use arrow_schema::SchemaRef;
 use async_recursion::async_recursion;
 use bytes::Bytes;
 use futures::StreamExt;
+use futures::stream::BoxStream;
 use object_store::path::Path as ObjPath;
 use object_store::{ObjectStore, parse_url_opts};
 use parquet::arrow::async_reader::ParquetObjectReader;
@@ -47,6 +49,70 @@ pub mod file_metadata;
 pub mod reader;
 pub mod util;
 
+/// Options for reading Parquet files with streaming.
+#[derive(Clone, Debug, Default)]
+pub struct ParquetReadOptions {
+    /// Target batch size (number of rows per batch).
+    pub batch_size: Option<usize>,
+    /// Column projection (indices of columns to read).
+    pub projection: Option<Vec<usize>>,
+}
+
+impl ParquetReadOptions {
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = Some(batch_size);
+        self
+    }
+
+    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+}
+
+/// A stream of record batches from a Parquet file with its schema.
+///
+/// # Implementation Note
+///
+/// This struct uses a `BoxStream` internally, which requires a heap 
allocation per file.
+/// For high-performance scenarios with many small files, this adds minimal 
overhead since:
+/// - Batch processing amortizes the allocation cost (each batch may contain 
thousands of rows)
+/// - The streaming benefit (lazy evaluation, reduced memory) outweighs the 
Box allocation cost
+/// - For typical parquet files (>10MB), the Box overhead (~40-80 bytes) is 
negligible
+pub struct ParquetFileStream {
+    schema: SchemaRef,
+    stream: BoxStream<'static, std::result::Result<RecordBatch, 
parquet::errors::ParquetError>>,
+}
+
+impl ParquetFileStream {
+    /// Returns the Arrow schema of the Parquet file.
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Consumes self and returns the inner stream.
+    pub fn into_stream(
+        self,
+    ) -> BoxStream<'static, std::result::Result<RecordBatch, 
parquet::errors::ParquetError>> {
+        self.stream
+    }
+}
+
+impl futures::Stream for ParquetFileStream {
+    type Item = std::result::Result<RecordBatch, 
parquet::errors::ParquetError>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        self.stream.as_mut().poll_next(cx)
+    }
+}
+
 #[allow(dead_code)]
 #[derive(Clone, Debug)]
 pub struct Storage {
@@ -206,6 +272,48 @@ impl Storage {
         Ok(concat_batches(&schema, &batches)?)
     }
 
+    /// Get a streaming reader for a Parquet file.
+    ///
+    /// Returns a [ParquetFileStream] that yields record batches as they are 
read,
+    /// without loading all data into memory at once.
+    ///
+    /// # Arguments
+    /// * `relative_path` - The relative path to the Parquet file.
+    /// * `options` - Options for reading the Parquet file (batch size, 
projection).
+    pub async fn get_parquet_file_stream(
+        &self,
+        relative_path: &str,
+        options: ParquetReadOptions,
+    ) -> Result<ParquetFileStream> {
+        let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
+        let obj_store = self.object_store.clone();
+        let meta = obj_store.head(&obj_path).await?;
+
+        let reader = ParquetObjectReader::new(obj_store, 
obj_path).with_file_size(meta.size);
+        let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+
+        if let Some(batch_size) = options.batch_size {
+            builder = builder.with_batch_size(batch_size);
+        }
+
+        if let Some(projection) = options.projection {
+            let projection_mask = parquet::arrow::ProjectionMask::roots(
+                builder.parquet_schema(),
+                projection.iter().copied(),
+            );
+            builder = builder.with_projection(projection_mask);
+        }
+
+        let schema = builder.schema().clone();
+        let stream = builder.build()?;
+
+        Ok(ParquetFileStream {
+            schema,
+            stream: Box::pin(stream),
+        })
+    }
+
     pub async fn get_storage_reader(&self, relative_path: &str) -> 
Result<StorageReader> {
         let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 80a00d7..b85c2b5 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -91,8 +91,11 @@ pub mod builder;
 mod fs_view;
 mod listing;
 pub mod partition;
+mod read_options;
 mod validation;
 
+pub use read_options::ReadOptions;
+
 use crate::Result;
 use crate::config::HudiConfigs;
 use crate::config::read::HudiReadConfig;
@@ -790,6 +793,155 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         todo!("read_incremental_changes")
     }
+
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of reading a single file slice. It 
returns a stream
+    /// that yields record batches as they are read, without loading all data 
into memory.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let file_slices = table.get_file_slices(empty_filters()).await?;
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    ///
+    /// for file_slice in &file_slices {
+    ///     let mut stream = table.read_file_slice_stream(file_slice, 
&options).await?;
+    ///     while let Some(result) = stream.next().await {
+    ///         let batch = result?;
+    ///         // Process batch...
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        use futures::stream;
+
+        let timestamp = options
+            .as_of_timestamp
+            .as_deref()
+            .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+        let Some(timestamp) = timestamp else {
+            // No commit timestamp means empty table - return empty stream 
(consistent with read_snapshot)
+            return Ok(Box::pin(stream::empty()));
+        };
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        fg_reader.read_file_slice_stream(file_slice, options).await
+    }
+
+    /// Reads the table snapshot as a stream of record batches.
+    ///
+    /// This is the streaming version of [Table::read_snapshot]. Instead of 
returning
+    /// all batches at once, it returns a stream that yields record batches as 
they
+    /// are read from the underlying file slices.
+    ///
+    /// # Arguments
+    /// * `options` - Read options including partition filters, batch size, 
and projection.
+    ///
+    /// # Returns
+    /// A stream of record batches from all file slices. Errors from 
individual file
+    /// slices are propagated through the stream.
+    ///
+    /// # Limitations
+    ///
+    /// - The `row_predicate` field in [ReadOptions] is not yet implemented 
for streaming reads.
+    /// - For MOR tables with log files, streaming falls back to the 
collect-and-merge approach.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let options = ReadOptions::new()
+    ///     .with_filters([("city", "=", "san_francisco")])
+    ///     .with_batch_size(4096);
+    /// let mut stream = table.read_snapshot_stream(&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     println!("Read {} rows", batch.num_rows());
+    /// }
+    /// ```
+    pub async fn read_snapshot_stream(
+        &self,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        use futures::stream::{self, StreamExt};
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            return Ok(Box::pin(stream::empty()));
+        };
+
+        let filters: Vec<Filter> = options
+            .partition_filters
+            .iter()
+            .map(|(f, o, v)| Filter::try_from((f.as_str(), o.as_str(), 
v.as_str())))
+            .collect::<Result<Vec<_>>>()?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
&filters).await?;
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(stream::empty()));
+        }
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        // Extract options to pass to each file slice read.
+        // Note: row_predicate is not yet supported in streaming base file 
reads.
+        let batch_size = options.batch_size;
+        let projection = options.projection.clone();
+
+        let streams_iter = file_slices.into_iter().map(move |file_slice| {
+            let fg_reader = fg_reader.clone();
+            let projection = projection.clone();
+            let options = ReadOptions {
+                partition_filters: vec![],
+                projection,
+                row_predicate: None, // Not yet implemented in streaming reads
+                batch_size,
+                as_of_timestamp: None,
+            };
+            async move {
+                fg_reader
+                    .read_file_slice_stream(&file_slice, &options)
+                    .await
+            }
+        });
+
+        // Chain all file slice streams together, propagating errors to the 
caller.
+        let combined_stream = stream::iter(streams_iter)
+            .then(|fut| fut)
+            .flat_map(|result| match result {
+                Ok(file_stream) => file_stream.left_stream(),
+                Err(e) => stream::once(async move { Err(e) }).right_stream(),
+            });
+
+        Ok(Box::pin(combined_stream))
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index 069dbde..79aa32d 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -118,7 +118,7 @@ impl PartitionPruner {
         self.and_filters.iter().all(|filter| {
             match segments.get(filter.field.name()) {
                 Some(segment_value) => {
-                    match filter.apply_comparsion(segment_value) {
+                    match filter.apply_comparison(segment_value) {
                         Ok(scalar) => scalar.value(0),
                         Err(_) => true, // Include the partition when 
comparison error occurs
                     }
diff --git a/crates/core/src/table/read_options.rs 
b/crates/core/src/table/read_options.rs
new file mode 100644
index 0000000..499546d
--- /dev/null
+++ b/crates/core/src/table/read_options.rs
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Read options for streaming reads.
+
+use arrow_array::{BooleanArray, RecordBatch};
+
+/// A row-level predicate function for filtering records.
+pub type RowPredicate = Box<dyn Fn(&RecordBatch) -> 
crate::Result<BooleanArray> + Send + Sync>;
+
+/// A partition filter tuple: (field_name, operator, value).
+/// Example: ("city", "=", "san_francisco")
+pub type PartitionFilter = (String, String, String);
+
+/// Options for reading file slices with streaming APIs.
+///
+/// This struct provides configuration for:
+/// - Partition filters (filtering partitions)
+/// - Column projection (which columns to read)
+/// - Row-level predicates (filtering rows)
+/// - Batch size control (rows per batch)
+/// - Time travel (as-of timestamp)
+///
+/// # Current Limitations
+///
+/// Not all options are fully supported in streaming APIs:
+/// - `batch_size` and `partition_filters` are fully supported.
+/// - `projection` is passed through to the streaming implementation but is 
not yet
+///   applied at the parquet read level. This is because projection requires 
mapping
+///   column names to column indices via schema lookup, which is not yet 
implemented.
+/// - `row_predicate` is not yet implemented in streaming reads. The predicate 
function
+///   is accepted but will be ignored.
+///
+/// # Example
+///
+/// ```ignore
+/// use hudi::table::ReadOptions;
+///
+/// let options = ReadOptions::new()
+///     .with_filters([("city", "=", "san_francisco")])
+///     .with_batch_size(4096);
+/// ```
+#[derive(Default)]
+pub struct ReadOptions {
+    /// Partition filters. Each filter is a tuple of (field, operator, value).
+    pub partition_filters: Vec<PartitionFilter>,
+
+    /// Column names to project (select). If None, all columns are read.
+    pub projection: Option<Vec<String>>,
+
+    /// Row-level filter predicate. Applied after reading each batch.
+    pub row_predicate: Option<RowPredicate>,
+
+    /// Target number of rows per batch.
+    pub batch_size: Option<usize>,
+
+    /// Timestamp for time travel queries (as-of).
+    pub as_of_timestamp: Option<String>,
+}
+
+impl ReadOptions {
+    /// Creates a new ReadOptions with default values.
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Sets partition filters.
+    ///
+    /// # Arguments
+    /// * `filters` - Partition filters as tuples of (field, operator, value)
+    pub fn with_filters<I, S1, S2, S3>(mut self, filters: I) -> Self
+    where
+        I: IntoIterator<Item = (S1, S2, S3)>,
+        S1: Into<String>,
+        S2: Into<String>,
+        S3: Into<String>,
+    {
+        self.partition_filters = filters
+            .into_iter()
+            .map(|(f, o, v)| (f.into(), o.into(), v.into()))
+            .collect();
+        self
+    }
+
+    /// Sets the column projection (which columns to read).
+    ///
+    /// # Arguments
+    /// * `columns` - Column names to project
+    pub fn with_projection<I, S>(mut self, columns: I) -> Self
+    where
+        I: IntoIterator<Item = S>,
+        S: Into<String>,
+    {
+        self.projection = Some(columns.into_iter().map(|s| 
s.into()).collect());
+        self
+    }
+
+    /// Sets the row-level predicate for filtering records.
+    ///
+    /// **Note:** Row predicates are not yet implemented in streaming reads.
+    /// The predicate will be accepted but ignored during streaming operations.
+    ///
+    /// # Arguments
+    /// * `predicate` - A function that takes a RecordBatch and returns a 
BooleanArray mask
+    pub fn with_row_predicate<F>(mut self, predicate: F) -> Self
+    where
+        F: Fn(&RecordBatch) -> crate::Result<BooleanArray> + Send + Sync + 
'static,
+    {
+        self.row_predicate = Some(Box::new(predicate));
+        self
+    }
+
+    /// Sets the target batch size (rows per batch).
+    ///
+    /// # Arguments
+    /// * `size` - Target number of rows per batch
+    pub fn with_batch_size(mut self, size: usize) -> Self {
+        self.batch_size = Some(size);
+        self
+    }
+
+    /// Sets the as-of timestamp for time travel queries.
+    ///
+    /// # Arguments
+    /// * `timestamp` - The timestamp to query as of
+    pub fn with_as_of_timestamp<S: Into<String>>(mut self, timestamp: S) -> 
Self {
+        self.as_of_timestamp = Some(timestamp.into());
+        self
+    }
+}
+
+impl std::fmt::Debug for ReadOptions {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ReadOptions")
+            .field("filters", &self.partition_filters)
+            .field("projection", &self.projection)
+            .field(
+                "row_predicate",
+                &self.row_predicate.as_ref().map(|_| "<predicate>"),
+            )
+            .field("batch_size", &self.batch_size)
+            .field("as_of_timestamp", &self.as_of_timestamp)
+            .finish()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_with_projection() {
+        let options = ReadOptions::new().with_projection(["col1", "col2", 
"col3"]);
+
+        assert_eq!(
+            options.projection,
+            Some(vec![
+                "col1".to_string(),
+                "col2".to_string(),
+                "col3".to_string()
+            ])
+        );
+    }
+
+    #[test]
+    fn test_with_row_predicate() {
+        let options = ReadOptions::new()
+            .with_row_predicate(|batch| Ok(BooleanArray::from(vec![true; 
batch.num_rows()])));
+
+        assert!(options.row_predicate.is_some());
+    }
+
+    #[test]
+    fn test_with_as_of_timestamp() {
+        let options = 
ReadOptions::new().with_as_of_timestamp("20240101120000000");
+
+        assert_eq!(
+            options.as_of_timestamp,
+            Some("20240101120000000".to_string())
+        );
+    }
+
+    #[test]
+    fn test_debug_format() {
+        let options = ReadOptions::new()
+            .with_filters([("city", "=", "sf")])
+            .with_projection(["id"])
+            .with_batch_size(1000);
+
+        let debug_str = format!("{options:?}");
+        assert!(debug_str.contains("ReadOptions"));
+        assert!(debug_str.contains("filters"));
+        assert!(debug_str.contains("projection"));
+        assert!(debug_str.contains("batch_size"));
+    }
+}
diff --git a/crates/core/tests/table_read_tests.rs 
b/crates/core/tests/table_read_tests.rs
index 218f7b3..9b36a6d 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -553,6 +553,432 @@ mod v8_tables {
             Ok(())
         }
     }
+
+    /// Streaming query tests for v8 tables
+    mod streaming_queries {
+        use super::*;
+        use futures::StreamExt;
+        use hudi_core::table::ReadOptions;
+
+        #[tokio::test]
+        async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+            let base_url = SampleTable::V8Empty.url_to_cow();
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+            assert!(batches.is_empty(), "Empty table should produce no 
batches");
+            Ok(())
+        }
+
+        #[tokio::test]
+        async fn test_read_snapshot_stream_basic() -> Result<()> {
+            let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce at least one batch");
+
+            // Concatenate batches and verify data
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", false),
+                    (2, "Bob", false),
+                    (3, "Carol", true),
+                    (4, "Diana", true),
+                ]
+            );
+            Ok(())
+        }
+
+        #[tokio::test]
+        async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+            let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+            let hudi_table = Table::new(base_url.path()).await?;
+
+            // Request small batch size
+            let options = ReadOptions::new().with_batch_size(1);
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            // With batch_size=1 and 4 rows, we expect multiple batches, but 
the
+            // exact number depends on both the batch_size setting and the 
Parquet
+            // file's internal row group structure.
+            let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+            assert_eq!(total_rows, 4, "Total rows should match expected 
count");
+            Ok(())
+        }
+
+        #[tokio::test]
+        async fn test_read_snapshot_stream_with_partition_filters() -> 
Result<()> {
+            let base_url = SampleTable::V8ComplexkeygenHivestyle.url_to_cow();
+            let hudi_table = Table::new(base_url.path()).await?;
+
+            let options = ReadOptions::new().with_filters([
+                ("byteField", ">=", "10"),
+                ("byteField", "<", "20"),
+                ("shortField", "!=", "100"),
+            ]);
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(
+                !batches.is_empty(),
+                "Should produce at least one batch for the given partition 
filters"
+            );
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol", 
true),]);
+            Ok(())
+        }
+
+        #[tokio::test]
+        async fn test_read_file_slice_stream_basic() -> Result<()> {
+            let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+            let hudi_table = Table::new(base_url.path()).await?;
+
+            // Get file slices first
+            let file_slices = 
hudi_table.get_file_slices(empty_filters()).await?;
+            assert!(
+                !file_slices.is_empty(),
+                "Should have at least one file slice"
+            );
+
+            let options = ReadOptions::new();
+            let file_slice = &file_slices[0];
+            let mut stream = hudi_table
+                .read_file_slice_stream(file_slice, &options)
+                .await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce at least one batch");
+
+            // Verify we got records
+            let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+            assert!(total_rows > 0, "Should read at least one row");
+            Ok(())
+        }
+
+        #[tokio::test]
+        async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+            let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+            let hudi_table = Table::new(base_url.path()).await?;
+
+            let file_slices = 
hudi_table.get_file_slices(empty_filters()).await?;
+            let file_slice = &file_slices[0];
+
+            // Test with small batch size
+            let options = ReadOptions::new().with_batch_size(1);
+            let mut stream = hudi_table
+                .read_file_slice_stream(file_slice, &options)
+                .await?;
+
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+            assert_eq!(total_rows, 4, "Should read all 4 rows");
+            Ok(())
+        }
+
+        #[tokio::test]
+        async fn test_read_snapshot_stream_mor_with_log_files() -> Result<()> {
+            // Test MOR table with log files - should still work (falls back 
to collect+merge)
+            // V8Trips8I3U1D: 8 inserts, 3 updates (A, J, G fare=0), 2 deletes 
(F, J)
+            let base_url = 
QuickstartTripsTable::V8Trips8I3U1D.url_to_mor_avro();
+            let hudi_table = Table::new(base_url.path()).await?;
+
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce batches from MOR 
table");
+
+            // Verify total row count - should have 6 rows (8 inserts - 2 
deletes)
+            let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+            assert_eq!(total_rows, 6, "Should have 6 rows (8 inserts - 2 
deleted)");
+
+            // Verify deleted riders are not present
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+            let uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records);
+            let riders: Vec<_> = uuid_rider_and_fare
+                .iter()
+                .map(|(_, rider, _)| rider.as_str())
+                .collect();
+
+            let deleted_riders = ["rider-F", "rider-J"];
+            assert!(
+                riders.iter().all(|rider| !deleted_riders.contains(rider)),
+                "Deleted riders should not be present in streaming results"
+            );
+
+            Ok(())
+        }
+    }
+}
+
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+    use super::*;
+    use futures::StreamExt;
+    use hudi_core::table::ReadOptions;
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+        for base_url in SampleTable::V6Empty.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+            assert!(batches.is_empty(), "Empty table should produce no 
batches");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_basic() -> Result<()> {
+        for base_url in SampleTable::V6Nonpartitioned.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce at least one batch");
+
+            // Concatenate batches and verify data
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", false),
+                    (2, "Bob", false),
+                    (3, "Carol", true),
+                    (4, "Diana", true),
+                ]
+            );
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Request small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        // With batch_size=1 and 4 rows, we expect multiple batches, but the
+        // exact number depends on both the batch_size setting and the Parquet
+        // file's internal row group structure.
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 4, "Total rows should match expected count");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_partition_filters() -> Result<()> {
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new().with_filters([
+            ("byteField", ">=", "10"),
+            ("byteField", "<", "20"),
+            ("shortField", "!=", "100"),
+        ]);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(
+            !batches.is_empty(),
+            "Should produce at least one batch for the given partition filters"
+        );
+        let schema = &batches[0].schema();
+        let records = concat_batches(schema, &batches)?;
+
+        let sample_data = SampleTable::sample_data_order_by_id(&records);
+        assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol", 
true),]);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_basic() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Get file slices first
+        let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+        assert!(
+            !file_slices.is_empty(),
+            "Should have at least one file slice"
+        );
+
+        let options = ReadOptions::new();
+        let file_slice = &file_slices[0];
+        let mut stream = hudi_table
+            .read_file_slice_stream(file_slice, &options)
+            .await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce at least one batch");
+
+        // Verify we got records
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert!(total_rows > 0, "Should read at least one row");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+        let file_slice = &file_slices[0];
+
+        // Test with small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table
+            .read_file_slice_stream(file_slice, &options)
+            .await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 4, "Should read all 4 rows");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_mor_with_log_files() -> Result<()> {
+        // Test MOR table with log files - should still work (falls back to 
collect+merge)
+        let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new();
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce batches from MOR table");
+
+        // Verify total row count
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 8, "Should have 8 rows (8 inserts)");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_successful_read() -> Result<()> {
+        // This test verifies that reading from a valid table succeeds without 
errors.
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new();
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // All reads should succeed without error
+        while let Some(result) = stream.next().await {
+            assert!(result.is_ok(), "Reading should not produce errors");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_empty_table_no_timestamp() -> 
Result<()> {
+        // For an empty table with no commit timestamp, streaming reads should 
return
+        // an empty stream (consistent with read_snapshot behavior).
+        let base_url = SampleTable::V6Empty.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new();
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut count = 0;
+        while (stream.next().await).is_some() {
+            count += 1;
+        }
+        assert_eq!(count, 0, "Empty table should produce no batches");
+        Ok(())
+    }
 }
 
 /// Test module for tables with metadata table (MDT) enabled.

Reply via email to