Copilot commented on code in PR #508:
URL: https://github.com/apache/hudi-rs/pull/508#discussion_r2659332081


##########
crates/core/src/file_group/reader.rs:
##########
@@ -325,6 +323,166 @@ impl FileGroupReader {
             .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
     }
 
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of [FileGroupReader::read_file_slice].
+    /// It returns a stream that yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice_stream(&file_slice, 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        self.read_file_slice_from_paths_stream(&base_file_path, 
log_file_paths, options)
+            .await
+    }
+
+    /// Reads a file slice from paths as a stream of record batches.
+    ///
+    /// This is the streaming version of 
[FileGroupReader::read_file_slice_from_paths].
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the base file.
+    /// * `log_file_paths` - Iterator of relative paths to log files.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    pub async fn read_file_slice_from_paths_stream<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+
+        if use_read_optimized {
+            return self.read_base_file_stream(base_file_path, options).await;
+        }
+
+        let log_file_paths: Vec<String> = log_file_paths
+            .into_iter()
+            .map(|s| s.as_ref().to_string())
+            .collect();
+
+        if log_file_paths.is_empty() {
+            self.read_base_file_stream(base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths(base_file_path, log_file_paths)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }
+    }
+
+    /// Reads a base file as a stream of record batches.
+    async fn read_base_file_stream(
+        &self,
+        relative_path: &str,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        use crate::config::table::BaseFileFormatValue;
+
+        // Validate base file format is parquet
+        let base_file_format: String = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::BaseFileFormat)
+            .into();
+        if 
!base_file_format.eq_ignore_ascii_case(BaseFileFormatValue::Parquet.as_ref()) {
+            return Err(ReadFileSliceError(format!(
+                "Streaming read only supports parquet format, got: 
{base_file_format}"
+            )));
+        }
+
+        let default_batch_size: usize = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::StreamBatchSize)
+            .into();
+        let batch_size = options.batch_size.unwrap_or(default_batch_size);
+        let parquet_options = 
ParquetReadOptions::new().with_batch_size(batch_size);
+
+        let hudi_configs = self.hudi_configs.clone();
+        let path = relative_path.to_string();
+
+        let parquet_stream = self
+            .storage
+            .get_parquet_file_stream(&path, parquet_options)
+            .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{path}: {e:?}")))
+            .await?;
+
+        // Apply the same filtering logic as read_file_slice_by_base_file_path
+        let stream = parquet_stream
+            .into_stream()
+            .filter_map(move |result| {
+                let hudi_configs = hudi_configs.clone();
+                async move {
+                    match result {
+                        Err(e) => Some(Err(ReadFileSliceError(format!(
+                            "Failed to read batch: {e:?}"
+                        )))),
+                        Ok(batch) => {
+                            match apply_commit_time_filter(&hudi_configs, 
batch) {
+                                Err(e) => Some(Err(e)),
+                                Ok(filtered) if filtered.num_rows() > 0 => 
Some(Ok(filtered)),
+                                Ok(_) => None,
+                            }
+                        }
+                    }
+                }
+            });
+
+        Ok(Box::pin(stream))
+    }

Review Comment:
   The new streaming APIs lack test coverage. While the existing 
`read_file_slice` method has tests in the file, there are no tests for 
`read_file_slice_stream`, `read_file_slice_from_paths_stream`, or 
`read_base_file_stream`. Given that other similar functions in this file have 
test coverage, these new streaming functions should also be tested to ensure 
they work correctly, especially for edge cases like empty batches, filtering, 
and error conditions.



##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,145 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         todo!("read_incremental_changes")
     }
+
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of reading a single file slice. It 
returns a stream
+    /// that yields record batches as they are read, without loading all data 
into memory.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let file_slices = table.get_file_slices(empty_filters()).await?;
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    ///
+    /// for file_slice in &file_slices {
+    ///     let mut stream = table.read_file_slice_stream(file_slice, 
&options).await?;
+    ///     while let Some(result) = stream.next().await {
+    ///         let batch = result?;
+    ///         // Process batch...
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        let timestamp = options
+            .as_of_timestamp
+            .as_deref()
+            .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+        let timestamp = match timestamp {
+            Some(ts) => ts.to_string(),
+            None => return Err(crate::error::CoreError::Timeline("No commit 
timestamp available".to_string())),
+        };
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp.as_str(),
+        )])?;
+
+        fg_reader.read_file_slice_stream(file_slice, options).await
+    }
+
+    /// Reads the table snapshot as a stream of record batches.
+    ///
+    /// This is the streaming version of [Table::read_snapshot]. Instead of 
returning
+    /// all batches at once, it returns a stream that yields record batches as 
they
+    /// are read from the underlying file slices.
+    ///
+    /// # Arguments
+    /// * `options` - Read options including partition filters, batch size, 
etc.
+    ///
+    /// # Returns
+    /// A stream of record batches from all file slices.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let options = ReadOptions::new()
+    ///     .with_filters([("city", "=", "san_francisco")])
+    ///     .with_batch_size(4096);
+    /// let mut stream = table.read_snapshot_stream(&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     println!("Read {} rows", batch.num_rows());
+    /// }
+    /// ```
+    pub async fn read_snapshot_stream(
+        &self,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        use futures::stream::{self, StreamExt};
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            return Ok(Box::pin(stream::empty()));
+        };
+
+        let filters: Vec<Filter> = options
+            .partition_filters
+            .iter()
+            .map(|(f, o, v)| Filter::try_from((f.as_str(), o.as_str(), 
v.as_str())))
+            .collect::<Result<Vec<_>>>()?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
&filters).await?;
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(stream::empty()));
+        }
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        let batch_size = options.batch_size;
+        let streams_iter = file_slices.into_iter().map(move |file_slice| {
+            let fg_reader = fg_reader.clone();
+            let options = ReadOptions {
+                partition_filters: vec![],
+                projection: None,
+                row_predicate: None,
+                batch_size,
+                as_of_timestamp: None,
+            };
+            async move {
+                fg_reader.read_file_slice_stream(&file_slice, &options).await
+            }
+        });
+
+        let combined_stream = stream::iter(streams_iter)
+            .then(|fut| fut)
+            .filter_map(|result| async move {
+                match result {
+                    Ok(stream) => Some(stream),
+                    Err(e) => {
+                        log::warn!("Failed to read file slice: {:?}", e);
+                        None
+                    }
+                }
+            })
+            .flatten();
+
+        Ok(Box::pin(combined_stream))
+    }

Review Comment:
   The streaming table APIs `read_file_slice_stream` and `read_snapshot_stream` 
lack test coverage. The existing code has comprehensive tests for the 
non-streaming versions (e.g., `read_snapshot`, `get_file_slices`), but there 
are no tests for the new streaming variants. Consider adding tests that verify 
the streaming behavior, including proper handling of multiple batches, empty 
streams, and error conditions.



##########
crates/core/src/file_group/reader.rs:
##########
@@ -325,6 +323,166 @@ impl FileGroupReader {
             .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
     }
 
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of [FileGroupReader::read_file_slice].
+    /// It returns a stream that yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice_stream(&file_slice, 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        self.read_file_slice_from_paths_stream(&base_file_path, 
log_file_paths, options)
+            .await
+    }
+
+    /// Reads a file slice from paths as a stream of record batches.
+    ///
+    /// This is the streaming version of 
[FileGroupReader::read_file_slice_from_paths].
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the base file.
+    /// * `log_file_paths` - Iterator of relative paths to log files.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    pub async fn read_file_slice_from_paths_stream<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+
+        if use_read_optimized {
+            return self.read_base_file_stream(base_file_path, options).await;
+        }
+
+        let log_file_paths: Vec<String> = log_file_paths
+            .into_iter()
+            .map(|s| s.as_ref().to_string())
+            .collect();
+
+        if log_file_paths.is_empty() {
+            self.read_base_file_stream(base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths(base_file_path, log_file_paths)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }
+    }
+
+    /// Reads a base file as a stream of record batches.
+    async fn read_base_file_stream(
+        &self,
+        relative_path: &str,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        use crate::config::table::BaseFileFormatValue;
+
+        // Validate base file format is parquet
+        let base_file_format: String = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::BaseFileFormat)
+            .into();
+        if 
!base_file_format.eq_ignore_ascii_case(BaseFileFormatValue::Parquet.as_ref()) {
+            return Err(ReadFileSliceError(format!(
+                "Streaming read only supports parquet format, got: 
{base_file_format}"
+            )));
+        }
+
+        let default_batch_size: usize = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::StreamBatchSize)
+            .into();
+        let batch_size = options.batch_size.unwrap_or(default_batch_size);
+        let parquet_options = 
ParquetReadOptions::new().with_batch_size(batch_size);

Review Comment:
   The `read_base_file_stream` method doesn't utilize the `projection` and 
`row_predicate` fields from `ReadOptions`. While batch_size is used, column 
projection could be passed to the ParquetReadOptions to optimize memory usage 
and I/O, and row predicates could be applied to filter batches in the stream. 
Consider implementing these features to provide the full functionality 
advertised by the ReadOptions API.



##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,145 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         todo!("read_incremental_changes")
     }
+
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of reading a single file slice. It 
returns a stream
+    /// that yields record batches as they are read, without loading all data 
into memory.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let file_slices = table.get_file_slices(empty_filters()).await?;
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    ///
+    /// for file_slice in &file_slices {
+    ///     let mut stream = table.read_file_slice_stream(file_slice, 
&options).await?;
+    ///     while let Some(result) = stream.next().await {
+    ///         let batch = result?;
+    ///         // Process batch...
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        let timestamp = options
+            .as_of_timestamp
+            .as_deref()
+            .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+        let timestamp = match timestamp {
+            Some(ts) => ts.to_string(),
+            None => return Err(crate::error::CoreError::Timeline("No commit 
timestamp available".to_string())),
+        };
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp.as_str(),
+        )])?;
+
+        fg_reader.read_file_slice_stream(file_slice, options).await
+    }
+
+    /// Reads the table snapshot as a stream of record batches.
+    ///
+    /// This is the streaming version of [Table::read_snapshot]. Instead of 
returning
+    /// all batches at once, it returns a stream that yields record batches as 
they
+    /// are read from the underlying file slices.
+    ///
+    /// # Arguments
+    /// * `options` - Read options including partition filters, batch size, 
etc.
+    ///
+    /// # Returns
+    /// A stream of record batches from all file slices.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    /// use hudi::table::ReadOptions;
+    ///
+    /// let options = ReadOptions::new()
+    ///     .with_filters([("city", "=", "san_francisco")])
+    ///     .with_batch_size(4096);
+    /// let mut stream = table.read_snapshot_stream(&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     println!("Read {} rows", batch.num_rows());
+    /// }
+    /// ```
+    pub async fn read_snapshot_stream(
+        &self,
+        options: &ReadOptions,
+    ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+        use futures::stream::{self, StreamExt};
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            return Ok(Box::pin(stream::empty()));
+        };
+
+        let filters: Vec<Filter> = options
+            .partition_filters
+            .iter()
+            .map(|(f, o, v)| Filter::try_from((f.as_str(), o.as_str(), 
v.as_str())))
+            .collect::<Result<Vec<_>>>()?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
&filters).await?;
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(stream::empty()));
+        }
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        let batch_size = options.batch_size;
+        let streams_iter = file_slices.into_iter().map(move |file_slice| {
+            let fg_reader = fg_reader.clone();
+            let options = ReadOptions {
+                partition_filters: vec![],
+                projection: None,
+                row_predicate: None,
+                batch_size,
+                as_of_timestamp: None,
+            };
+            async move {
+                fg_reader.read_file_slice_stream(&file_slice, &options).await
+            }
+        });
+
+        let combined_stream = stream::iter(streams_iter)
+            .then(|fut| fut)
+            .filter_map(|result| async move {
+                match result {
+                    Ok(stream) => Some(stream),
+                    Err(e) => {
+                        log::warn!("Failed to read file slice: {:?}", e);
+                        None

Review Comment:
   Failed file slice reads are silently logged and skipped using `log::warn!` 
and `filter_map`. This means users won't know if some of their data failed to 
read. Consider either propagating the error to the caller or providing a 
configuration option to control this behavior. Silent data loss can lead to 
incorrect query results without user awareness.
   ```suggestion
               .map(|result| {
                   match result {
                       Ok(stream) => stream,
                       Err(e) => {
                           log::warn!("Failed to read file slice: {:?}", e);
                           stream::once(async { Err(e) }).boxed()
   ```



##########
crates/core/src/file_group/reader.rs:
##########
@@ -425,6 +583,60 @@ impl FileGroupReader {
     }
 }
 
+/// Apply commit time filtering to a record batch.
+///
+/// This function mirrors the filtering logic in 
`FileGroupReader::create_filtering_mask_for_base_file_records`
+/// but takes `HudiConfigs` directly so it can be used in streaming contexts 
where `&self` is not available.
+fn apply_commit_time_filter(hudi_configs: &HudiConfigs, batch: RecordBatch) -> 
Result<RecordBatch> {
+    let populates_meta_fields: bool = hudi_configs
+        .get_or_default(HudiTableConfig::PopulatesMetaFields)
+        .into();
+    if !populates_meta_fields {
+        return Ok(batch);
+    }
+
+    let start_ts = hudi_configs
+        .try_get(HudiReadConfig::FileGroupStartTimestamp)
+        .map(|v| -> String { v.into() });
+    if start_ts.is_none() {
+        // If start timestamp is not provided, the query is snapshot or 
time-travel
+        return Ok(batch);
+    }
+
+    let mut and_filters: Vec<SchemableFilter> = Vec::new();
+    let schema = MetaField::schema();
+
+    if let Some(start) = start_ts {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), ">", 
start.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if let Some(end) = hudi_configs
+        .try_get(HudiReadConfig::FileGroupEndTimestamp)
+        .map(|v| -> String { v.into() })
+    {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=", 
end.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if and_filters.is_empty() {
+        return Ok(batch);
+    }
+
+    let mut mask = BooleanArray::from(vec![true; batch.num_rows()]);
+    for filter in &and_filters {
+        let col_name = filter.field.name().as_str();
+        let col_values = batch
+            .column_by_name(col_name)
+            .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not 
found")))?;
+        let comparison = filter.apply_comparsion(col_values)?;

Review Comment:
   This line contains a typo in the method name. The method is called 
`apply_comparsion` but it should be `apply_comparison`. This is an existing 
typo in the codebase (also present in the SchemableFilter implementation), but 
since you're now calling it from new code in the `apply_commit_time_filter` 
function, it's worth noting that this function should be renamed for clarity.



##########
crates/core/src/file_group/reader.rs:
##########
@@ -425,6 +583,60 @@ impl FileGroupReader {
     }
 }
 
+/// Apply commit time filtering to a record batch.
+///
+/// This function mirrors the filtering logic in 
`FileGroupReader::create_filtering_mask_for_base_file_records`
+/// but takes `HudiConfigs` directly so it can be used in streaming contexts 
where `&self` is not available.
+fn apply_commit_time_filter(hudi_configs: &HudiConfigs, batch: RecordBatch) -> 
Result<RecordBatch> {
+    let populates_meta_fields: bool = hudi_configs
+        .get_or_default(HudiTableConfig::PopulatesMetaFields)
+        .into();
+    if !populates_meta_fields {
+        return Ok(batch);
+    }
+
+    let start_ts = hudi_configs
+        .try_get(HudiReadConfig::FileGroupStartTimestamp)
+        .map(|v| -> String { v.into() });
+    if start_ts.is_none() {
+        // If start timestamp is not provided, the query is snapshot or 
time-travel
+        return Ok(batch);
+    }
+
+    let mut and_filters: Vec<SchemableFilter> = Vec::new();
+    let schema = MetaField::schema();
+
+    if let Some(start) = start_ts {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), ">", 
start.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if let Some(end) = hudi_configs
+        .try_get(HudiReadConfig::FileGroupEndTimestamp)
+        .map(|v| -> String { v.into() })
+    {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=", 
end.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if and_filters.is_empty() {
+        return Ok(batch);
+    }
+
+    let mut mask = BooleanArray::from(vec![true; batch.num_rows()]);
+    for filter in &and_filters {
+        let col_name = filter.field.name().as_str();
+        let col_values = batch
+            .column_by_name(col_name)
+            .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not 
found")))?;
+        let comparison = filter.apply_comparsion(col_values)?;
+        mask = and(&mask, &comparison)?;
+    }
+
+    filter_record_batch(&batch, &mask)
+        .map_err(|e| ReadFileSliceError(format!("Failed to filter records: 
{e:?}")))
+}

Review Comment:
   The `apply_commit_time_filter` function duplicates logic from 
`FileGroupReader::create_filtering_mask_for_base_file_records`. While the 
comment acknowledges this, code duplication can lead to maintenance issues if 
one is updated but not the other. Consider refactoring the common logic into a 
shared helper function that both can use, or have 
`create_filtering_mask_for_base_file_records` call `apply_commit_time_filter` 
internally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to