Copilot commented on code in PR #508:
URL: https://github.com/apache/hudi-rs/pull/508#discussion_r2659311067


##########
crates/core/src/file_group/reader.rs:
##########
@@ -425,6 +604,54 @@ impl FileGroupReader {
     }
 }
 
+/// Helper function to create a commit time filter mask for a record batch.
+/// This is used by the streaming read APIs.
+fn create_commit_time_filter(
+    hudi_configs: &HudiConfigs,
+    records: &RecordBatch,
+) -> Result<Option<BooleanArray>> {
+    let mut and_filters: Vec<SchemableFilter> = Vec::new();
+    let schema = MetaField::schema();
+
+    if let Some(start) = hudi_configs
+        .try_get(HudiReadConfig::FileGroupStartTimestamp)
+        .map(|v| -> String { v.into() })
+    {
+        let filter: Filter =
+            Filter::try_from((MetaField::CommitTime.as_ref(), ">", 
start.as_str()))?;
+        let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+        and_filters.push(filter);
+    } else {
+        // If start timestamp is not provided, no filtering needed
+        return Ok(None);
+    }
+
+    if let Some(end) = hudi_configs
+        .try_get(HudiReadConfig::FileGroupEndTimestamp)
+        .map(|v| -> String { v.into() })
+    {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=", 
end.as_str()))?;
+        let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+        and_filters.push(filter);
+    }
+
+    if and_filters.is_empty() {
+        return Ok(None);
+    }
+
+    let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
+    for filter in &and_filters {
+        let col_name = filter.field.name().as_str();
+        let col_values = records
+            .column_by_name(col_name)
+            .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not 
found")))?;
+
+        let comparison = filter.apply_comparsion(col_values)?;

Review Comment:
   Typo in method name: 'apply_comparsion' should be 'apply_comparison'. This 
typo exists in the existing codebase (line 165) and is propagated here.



##########
crates/core/src/file_group/reader.rs:
##########
@@ -325,6 +327,183 @@ impl FileGroupReader {
             .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
     }
 
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of [FileGroupReader::read_file_slice].
+    /// It returns a stream that yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice_stream(&file_slice, 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        self.read_file_slice_from_paths_stream(&base_file_path, 
log_file_paths, options)
+            .await
+    }
+
+    /// Reads a file slice from paths as a stream of record batches.
+    ///
+    /// This is the streaming version of 
[FileGroupReader::read_file_slice_from_paths].
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the base file.
+    /// * `log_file_paths` - Iterator of relative paths to log files.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    pub async fn read_file_slice_from_paths_stream<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        let log_file_paths: Vec<String> = log_file_paths
+            .into_iter()
+            .map(|s| s.as_ref().to_string())
+            .collect();
+
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+        let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+        if base_file_only {
+            // True streaming: return a stream from the parquet file
+            self.read_base_file_stream(base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths(base_file_path, log_file_paths)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }

Review Comment:
   For MOR tables with log files, the streaming implementation falls back to 
the non-streaming read_file_slice_from_paths method and returns the entire 
merged result as a single batch. This defeats the purpose of streaming for MOR 
tables and could cause high memory usage with large file slices. Consider 
documenting this limitation prominently in the function documentation, or 
implementing proper streaming merge support.



##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         todo!("read_incremental_changes")
     }
+
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of reading a single file slice. It 
returns a stream
+    /// that yields record batches as they are read, without loading all data 
into memory.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let file_slices = table.get_file_slices(empty_filters()).await?;
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    ///
+    /// for file_slice in &file_slices {
+    ///     let mut stream = table.read_file_slice_stream(file_slice, 
&options).await?;
+    ///     while let Some(result) = stream.next().await {
+    ///         let batch = result?;
+    ///         // Process batch...
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        let timestamp = options
+            .as_of_timestamp
+            .as_deref()
+            .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+        let timestamp = match timestamp {
+            Some(ts) => ts.to_string(),
+            None => return Err(crate::error::CoreError::Timeline("No commit 
timestamp available".to_string())),
+        };
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp.as_str(),
+        )])?;
+
+        fg_reader.read_file_slice_stream(file_slice, options).await
+    }
+
+    /// Reads the table snapshot as a stream of record batches.
+    ///
+    /// This is the streaming version of [Table::read_snapshot]. Instead of 
returning
+    /// all batches at once, it returns a stream that yields record batches as 
they
+    /// are read from the underlying file slices.
+    ///
+    /// # Arguments
+    /// * `filters` - Partition filters to apply.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches from all file slices.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = table.read_snapshot_stream(empty_filters(), 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     println!("Read {} rows", batch.num_rows());
+    /// }
+    /// ```
+    pub async fn read_snapshot_stream<I, S>(
+        &self,
+        filters: I,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        use futures::stream::{self, StreamExt};
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            // Return empty stream if no commits
+            return Ok(Box::pin(stream::empty()));
+        };

Review Comment:
   Inconsistent error handling compared to read_file_slice_stream. The 
read_file_slice_stream function (line 841) returns an error when no commit 
timestamp is available, but read_snapshot_stream (line 889-891) returns an 
empty stream. This inconsistency could confuse API users about expected 
behavior. Consider aligning these behaviors for consistency.
   ```suggestion
           let timestamp = self
               .timeline
               .get_latest_commit_timestamp_as_option()
               .ok_or_else(|| "No commit timestamp available for snapshot 
read".into())?;
   ```



##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         todo!("read_incremental_changes")
     }
+
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of reading a single file slice. It 
returns a stream
+    /// that yields record batches as they are read, without loading all data 
into memory.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let file_slices = table.get_file_slices(empty_filters()).await?;
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    ///
+    /// for file_slice in &file_slices {
+    ///     let mut stream = table.read_file_slice_stream(file_slice, 
&options).await?;
+    ///     while let Some(result) = stream.next().await {
+    ///         let batch = result?;
+    ///         // Process batch...
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        let timestamp = options
+            .as_of_timestamp
+            .as_deref()
+            .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+        let timestamp = match timestamp {
+            Some(ts) => ts.to_string(),
+            None => return Err(crate::error::CoreError::Timeline("No commit 
timestamp available".to_string())),
+        };
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp.as_str(),
+        )])?;
+
+        fg_reader.read_file_slice_stream(file_slice, options).await
+    }
+
+    /// Reads the table snapshot as a stream of record batches.
+    ///
+    /// This is the streaming version of [Table::read_snapshot]. Instead of 
returning
+    /// all batches at once, it returns a stream that yields record batches as 
they
+    /// are read from the underlying file slices.
+    ///
+    /// # Arguments
+    /// * `filters` - Partition filters to apply.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches from all file slices.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = table.read_snapshot_stream(empty_filters(), 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     println!("Read {} rows", batch.num_rows());
+    /// }
+    /// ```
+    pub async fn read_snapshot_stream<I, S>(
+        &self,
+        filters: I,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        use futures::stream::{self, StreamExt};
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            // Return empty stream if no commits
+            return Ok(Box::pin(stream::empty()));
+        };
+
+        let filters = from_str_tuples(filters)?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
&filters).await?;
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(stream::empty()));
+        }
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        // Create a stream that iterates through file slices and yields batches
+        let batch_size = options.batch_size;
+        let streams_iter = file_slices.into_iter().map(move |file_slice| {
+            let fg_reader = fg_reader.clone();
+            let options = ReadOptions {
+                projection: None,
+                row_predicate: None,
+                batch_size,
+                as_of_timestamp: None,
+            };

Review Comment:
   The projection and row_predicate options from the input ReadOptions are 
being ignored. Only batch_size is extracted and used, while projection and 
row_predicate are explicitly set to None when creating a new ReadOptions for 
each file slice. This means users cannot apply column projections or row-level 
filters when using read_snapshot_stream, which breaks the API contract 
suggested by the ReadOptions parameter.
   ```suggestion
               // Clone the caller-provided ReadOptions so that projection,
               // row_predicate, as_of_timestamp, and batch_size are all 
preserved.
               let mut options = options.clone();
               options.batch_size = batch_size;
   ```



##########
crates/core/src/file_group/reader.rs:
##########
@@ -325,6 +327,183 @@ impl FileGroupReader {
             .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
     }
 
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of [FileGroupReader::read_file_slice].
+    /// It returns a stream that yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice_stream(&file_slice, 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        self.read_file_slice_from_paths_stream(&base_file_path, 
log_file_paths, options)
+            .await
+    }
+
+    /// Reads a file slice from paths as a stream of record batches.
+    ///
+    /// This is the streaming version of 
[FileGroupReader::read_file_slice_from_paths].
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the base file.
+    /// * `log_file_paths` - Iterator of relative paths to log files.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    pub async fn read_file_slice_from_paths_stream<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        let log_file_paths: Vec<String> = log_file_paths
+            .into_iter()
+            .map(|s| s.as_ref().to_string())
+            .collect();
+
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+        let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+        if base_file_only {
+            // True streaming: return a stream from the parquet file
+            self.read_base_file_stream(base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths(base_file_path, log_file_paths)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }
+    }
+
+    /// Reads a base file as a stream of record batches.
+    ///
+    /// This method streams record batches directly from the parquet file 
without
+    /// loading all data into memory at once.
+    async fn read_base_file_stream(
+        &self,
+        relative_path: &str,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        // Get default batch size from config
+        let default_batch_size: usize = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::StreamBatchSize)
+            .into();
+
+        let batch_size = options.batch_size.unwrap_or(default_batch_size);
+
+        let parquet_options = 
ParquetReadOptions::new().with_batch_size(batch_size);
+
+        let hudi_configs = self.hudi_configs.clone();
+        let path = relative_path.to_string();
+
+        // Get the parquet stream
+        let parquet_stream = self
+            .storage
+            .get_parquet_file_stream(&path, parquet_options)
+            .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{path}: {e:?}")))
+            .await?;
+
+        let populates_meta_fields: bool = hudi_configs
+            .get_or_default(HudiTableConfig::PopulatesMetaFields)
+            .into();
+
+        // Create a stream that applies commit time filtering
+        let stream = parquet_stream
+            .into_stream()
+            .filter_map(move |result| {
+                let hudi_configs = hudi_configs.clone();
+                async move {
+                    match result {
+                        Err(e) => Some(Err(ReadFileSliceError(format!(
+                            "Failed to read batch: {e:?}"
+                        )))),
+                        Ok(batch) => {
+                            // Apply commit time filtering if meta fields are 
populated
+                            let filtered = if populates_meta_fields {
+                                match create_commit_time_filter(&hudi_configs, 
&batch) {
+                                    Err(e) => return Some(Err(e)),
+                                    Ok(Some(mask)) => {
+                                        match filter_record_batch(&batch, 
&mask) {
+                                            Ok(b) => b,
+                                            Err(e) => {
+                                                return 
Some(Err(ReadFileSliceError(format!(
+                                                    "Failed to filter records: 
{e:?}"
+                                                ))))
+                                            }
+                                        }
+                                    }
+                                    Ok(None) => batch,
+                                }
+                            } else {
+                                batch
+                            };
+
+                            // Only yield non-empty batches
+                            if filtered.num_rows() > 0 {
+                                Some(Ok(filtered))
+                            } else {
+                                None
+                            }
+                        }
+                    }
+                }
+            });
+
+        Ok(Box::pin(stream))
+    }

Review Comment:
   The projection and row_predicate fields in ReadOptions are defined but never 
actually used in the streaming implementation. The read_base_file_stream 
function only applies commit time filtering but doesn't handle column 
projection (which would need to be passed to ParquetReadOptions.projection as 
column indices) or row-level predicates (which would need to be applied after 
commit time filtering). This makes these ReadOptions fields non-functional in 
the streaming API.



##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         todo!("read_incremental_changes")
     }
+
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of reading a single file slice. It 
returns a stream
+    /// that yields record batches as they are read, without loading all data 
into memory.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let file_slices = table.get_file_slices(empty_filters()).await?;
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    ///
+    /// for file_slice in &file_slices {
+    ///     let mut stream = table.read_file_slice_stream(file_slice, 
&options).await?;
+    ///     while let Some(result) = stream.next().await {
+    ///         let batch = result?;
+    ///         // Process batch...
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        let timestamp = options
+            .as_of_timestamp
+            .as_deref()
+            .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+        let timestamp = match timestamp {
+            Some(ts) => ts.to_string(),
+            None => return Err(crate::error::CoreError::Timeline("No commit 
timestamp available".to_string())),
+        };
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp.as_str(),
+        )])?;
+
+        fg_reader.read_file_slice_stream(file_slice, options).await
+    }
+
+    /// Reads the table snapshot as a stream of record batches.
+    ///
+    /// This is the streaming version of [Table::read_snapshot]. Instead of 
returning
+    /// all batches at once, it returns a stream that yields record batches as 
they
+    /// are read from the underlying file slices.
+    ///
+    /// # Arguments
+    /// * `filters` - Partition filters to apply.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches from all file slices.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = table.read_snapshot_stream(empty_filters(), 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     println!("Read {} rows", batch.num_rows());
+    /// }
+    /// ```
+    pub async fn read_snapshot_stream<I, S>(
+        &self,
+        filters: I,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        use futures::stream::{self, StreamExt};
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            // Return empty stream if no commits
+            return Ok(Box::pin(stream::empty()));
+        };
+
+        let filters = from_str_tuples(filters)?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
&filters).await?;
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(stream::empty()));
+        }
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        // Create a stream that iterates through file slices and yields batches
+        let batch_size = options.batch_size;
+        let streams_iter = file_slices.into_iter().map(move |file_slice| {
+            let fg_reader = fg_reader.clone();
+            let options = ReadOptions {
+                projection: None,
+                row_predicate: None,
+                batch_size,
+                as_of_timestamp: None,
+            };
+            async move {
+                fg_reader.read_file_slice_stream(&file_slice, &options).await
+            }
+        });
+
+        // Chain all the streams together
+        let combined_stream = stream::iter(streams_iter)
+            .then(|fut| fut)
+            .filter_map(|result| async move {
+                match result {
+                    Ok(stream) => Some(stream),
+                    Err(e) => {
+                        // Log error and skip this file slice
+                        log::warn!("Failed to read file slice: {:?}", e);
+                        None

Review Comment:
   Errors from individual file slices are silently dropped with only a warning 
logged. This could lead to incomplete or missing data without the caller being 
aware. Consider either propagating the error to the stream (allowing the 
consumer to decide how to handle it) or accumulating errors and returning them 
at the end. If silent skipping is intentional for fault tolerance, this 
behavior should be documented in the function's documentation.
   ```suggestion
               .map(|result| {
                   match result {
                       Ok(stream) => stream,
                       Err(e) => {
                           // Log error and propagate it as an item in the 
output stream
                           log::warn!("Failed to read file slice: {:?}", e);
                           Box::pin(stream::once(async move { Err(e) })) as _
   ```



##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         todo!("read_incremental_changes")
     }
+
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of reading a single file slice. It 
returns a stream
+    /// that yields record batches as they are read, without loading all data 
into memory.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let file_slices = table.get_file_slices(empty_filters()).await?;
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    ///
+    /// for file_slice in &file_slices {
+    ///     let mut stream = table.read_file_slice_stream(file_slice, 
&options).await?;
+    ///     while let Some(result) = stream.next().await {
+    ///         let batch = result?;
+    ///         // Process batch...
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        let timestamp = options
+            .as_of_timestamp
+            .as_deref()
+            .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+        let timestamp = match timestamp {
+            Some(ts) => ts.to_string(),
+            None => return Err(crate::error::CoreError::Timeline("No commit 
timestamp available".to_string())),
+        };
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp.as_str(),
+        )])?;
+
+        fg_reader.read_file_slice_stream(file_slice, options).await
+    }
+
+    /// Reads the table snapshot as a stream of record batches.
+    ///
+    /// This is the streaming version of [Table::read_snapshot]. Instead of 
returning
+    /// all batches at once, it returns a stream that yields record batches as 
they
+    /// are read from the underlying file slices.
+    ///
+    /// # Arguments
+    /// * `filters` - Partition filters to apply.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches from all file slices.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = table.read_snapshot_stream(empty_filters(), 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     println!("Read {} rows", batch.num_rows());
+    /// }
+    /// ```
+    pub async fn read_snapshot_stream<I, S>(
+        &self,
+        filters: I,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        use futures::stream::{self, StreamExt};
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            // Return empty stream if no commits
+            return Ok(Box::pin(stream::empty()));
+        };
+
+        let filters = from_str_tuples(filters)?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
&filters).await?;
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(stream::empty()));
+        }
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        // Create a stream that iterates through file slices and yields batches
+        let batch_size = options.batch_size;
+        let streams_iter = file_slices.into_iter().map(move |file_slice| {
+            let fg_reader = fg_reader.clone();
+            let options = ReadOptions {
+                projection: None,
+                row_predicate: None,
+                batch_size,
+                as_of_timestamp: None,
+            };
+            async move {
+                fg_reader.read_file_slice_stream(&file_slice, &options).await
+            }
+        });
+
+        // Chain all the streams together
+        let combined_stream = stream::iter(streams_iter)
+            .then(|fut| fut)
+            .filter_map(|result| async move {
+                match result {
+                    Ok(stream) => Some(stream),
+                    Err(e) => {
+                        // Log error and skip this file slice
+                        log::warn!("Failed to read file slice: {:?}", e);
+                        None
+                    }
+                }
+            })
+            .flatten();
+
+        Ok(Box::pin(combined_stream))
+    }

Review Comment:
   The new streaming APIs (read_file_slice_stream and read_snapshot_stream) 
lack test coverage. The codebase has comprehensive test coverage for similar 
non-streaming read functions in crates/core/tests/table_read_tests.rs. Consider 
adding tests for the streaming APIs that verify basic functionality, error 
handling, and edge cases like empty results and missing timestamps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to