Copilot commented on code in PR #508:
URL: https://github.com/apache/hudi-rs/pull/508#discussion_r2659455386


##########
crates/core/src/table/read_options.rs:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Read options for streaming reads.
+
+use arrow_array::{BooleanArray, RecordBatch};
+
+/// A row-level predicate function for filtering records.
+pub type RowPredicate = Box<dyn Fn(&RecordBatch) -> 
crate::Result<BooleanArray> + Send + Sync>;
+
+/// A partition filter tuple: (field_name, operator, value).
+/// Example: ("city", "=", "san_francisco")
+pub type PartitionFilter = (String, String, String);
+
+/// Options for reading file slices with streaming APIs.
+///
+/// This struct provides configuration for:
+/// - Partition filters (filtering partitions)
+/// - Column projection (which columns to read)
+/// - Row-level predicates (filtering rows)
+/// - Batch size control (rows per batch)
+/// - Time travel (as-of timestamp)
+///
+/// # Current Limitations
+///
+/// Not all options are fully supported in streaming APIs:
+/// - `batch_size` and `partition_filters` are fully supported.
+/// - `projection` is passed through to the streaming implementation but is 
not yet
+///   applied at the parquet read level. This is because projection requires 
mapping
+///   column names to column indices via schema lookup, which is not yet 
implemented.
+/// - `row_predicate` is not yet implemented in streaming reads. The predicate 
function
+///   is accepted but will be ignored.
+///
+/// # Example
+///
+/// ```ignore
+/// use hudi::table::ReadOptions;
+///
+/// let options = ReadOptions::new()
+///     .with_filters([("city", "=", "san_francisco")])
+///     .with_batch_size(4096);
+/// ```
+#[derive(Default)]
+pub struct ReadOptions {
+    /// Partition filters. Each filter is a tuple of (field, operator, value).
+    pub partition_filters: Vec<PartitionFilter>,
+
+    /// Column names to project (select). If None, all columns are read.
+    pub projection: Option<Vec<String>>,
+
+    /// Row-level filter predicate. Applied after reading each batch.
+    pub row_predicate: Option<RowPredicate>,
+
+    /// Target number of rows per batch.
+    pub batch_size: Option<usize>,
+
+    /// Timestamp for time travel queries (as-of).
+    pub as_of_timestamp: Option<String>,
+}

Review Comment:
   The `ReadOptions` struct fields are public, but in the documentation at 
lines 39-47, there's a discrepancy. The documentation states that "projection" 
and "row_predicate" are "not yet implemented," but the struct allows them to be 
set. This could lead to confusion. Consider either:
   1. Making these fields private with getter methods to prevent misuse, or
   2. Adding runtime validation that returns an error if these unimplemented 
options are used, or
   3. More prominently documenting that these fields are accepted but silently 
ignored
   
   The current approach of silently ignoring options is not ideal from an API 
design perspective, as users may set these options expecting them to work.



##########
crates/core/src/storage/mod.rs:
##########
@@ -47,6 +49,62 @@ pub mod file_metadata;
 pub mod reader;
 pub mod util;
 
+/// Options for reading Parquet files with streaming.
+#[derive(Clone, Debug, Default)]
+pub struct ParquetReadOptions {
+    /// Target batch size (number of rows per batch).
+    pub batch_size: Option<usize>,
+    /// Column projection (indices of columns to read).
+    pub projection: Option<Vec<usize>>,
+}
+
+impl ParquetReadOptions {
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = Some(batch_size);
+        self
+    }
+
+    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+}
+
+/// A stream of record batches from a Parquet file with its schema.
+pub struct ParquetFileStream {
+    schema: SchemaRef,
+    stream: BoxStream<'static, std::result::Result<RecordBatch, 
parquet::errors::ParquetError>>,
+}

Review Comment:
   The `ParquetFileStream` struct owns a `BoxStream` which requires heap 
allocation for each file. For high-performance scenarios with many small files, 
this could add overhead. Consider documenting this trade-off, or potentially 
providing a non-boxed variant in the future if performance profiling shows this 
to be a bottleneck. This is not a blocking issue, but worth noting for future 
optimization.



##########
crates/core/src/file_group/reader.rs:
##########
@@ -325,6 +323,173 @@ impl FileGroupReader {
             .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
     }
 
+    // 
=========================================================================
+    // Streaming Read APIs
+    // 
=========================================================================
+
+    /// Reads a file slice as a stream of record batches.
+    ///
+    /// This is the streaming version of [FileGroupReader::read_file_slice].
+    /// It returns a stream that yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch. This limitation exists 
because
+    /// streaming merge of base files with log files is not yet implemented.
+    ///
+    /// # Limitations
+    ///
+    /// - The `projection` and `row_predicate` fields in [ReadOptions] are not 
yet
+    ///   implemented for streaming reads. Only `batch_size` is currently 
supported.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice_stream(&file_slice, 
&options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice_stream(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        self.read_file_slice_from_paths_stream(&base_file_path, 
log_file_paths, options)
+            .await
+    }
+
+    /// Reads a file slice from paths as a stream of record batches.
+    ///
+    /// This is the streaming version of 
[FileGroupReader::read_file_slice_from_paths].
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the base file.
+    /// * `log_file_paths` - Iterator of relative paths to log files.
+    /// * `options` - Read options for configuring the read operation.
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    pub async fn read_file_slice_from_paths_stream<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+
+        if use_read_optimized {
+            return self.read_base_file_stream(base_file_path, options).await;
+        }
+
+        let log_file_paths: Vec<String> = log_file_paths
+            .into_iter()
+            .map(|s| s.as_ref().to_string())
+            .collect();
+
+        if log_file_paths.is_empty() {
+            self.read_base_file_stream(base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths(base_file_path, log_file_paths)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }
+    }
+
+    /// Reads a base file as a stream of record batches.
+    ///
+    /// # Limitations
+    ///
+    /// Currently only `batch_size` from [ReadOptions] is used. The 
`projection` and
+    /// `row_predicate` fields are not yet implemented.
+    async fn read_base_file_stream(
+        &self,
+        relative_path: &str,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        use crate::config::table::BaseFileFormatValue;
+
+        // Validate base file format is parquet
+        let base_file_format: String = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::BaseFileFormat)
+            .into();
+        if 
!base_file_format.eq_ignore_ascii_case(BaseFileFormatValue::Parquet.as_ref()) {
+            return Err(ReadFileSliceError(format!(
+                "Streaming read only supports parquet format, got: 
{base_file_format}"
+            )));
+        }
+
+        let default_batch_size: usize = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::StreamBatchSize)
+            .into();
+        let batch_size = options.batch_size.unwrap_or(default_batch_size);
+        let parquet_options = 
ParquetReadOptions::new().with_batch_size(batch_size);
+
+        let hudi_configs = self.hudi_configs.clone();
+        let path = relative_path.to_string();
+
+        let parquet_stream = self
+            .storage
+            .get_parquet_file_stream(&path, parquet_options)
+            .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{path}: {e:?}")))
+            .await?;
+
+        // Apply the same filtering logic as read_file_slice_by_base_file_path
+        let stream = parquet_stream.into_stream().filter_map(move |result| {
+            let hudi_configs = hudi_configs.clone();
+            async move {
+                match result {
+                    Err(e) => Some(Err(ReadFileSliceError(format!(
+                        "Failed to read batch: {e:?}"
+                    )))),
+                    Ok(batch) => match apply_commit_time_filter(&hudi_configs, 
batch) {
+                        Err(e) => Some(Err(e)),
+                        Ok(filtered) if filtered.num_rows() > 0 => 
Some(Ok(filtered)),
+                        Ok(_) => None,
+                    },
+                }
+            }
+        });

Review Comment:
   The `filter_map` closure at line 474 clones `hudi_configs` (an `Arc`) for 
every batch processed. While `Arc::clone` is cheap (just incrementing a 
reference count), this pattern could be avoided by using `Arc` directly in the 
closure. However, the current implementation is correct and the overhead is 
minimal. Consider this a minor optimization opportunity rather than a required 
change.



##########
crates/core/tests/table_read_tests.rs:
##########
@@ -555,6 +555,229 @@ mod v8_tables {
     }
 }
 
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+    use super::*;
+    use futures::StreamExt;
+    use hudi_core::table::ReadOptions;
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+        for base_url in SampleTable::V6Empty.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+            assert!(batches.is_empty(), "Empty table should produce no 
batches");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_basic() -> Result<()> {
+        for base_url in SampleTable::V6Nonpartitioned.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce at least one batch");
+
+            // Concatenate batches and verify data
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", false),
+                    (2, "Bob", false),
+                    (3, "Carol", true),
+                    (4, "Diana", true),
+                ]
+            );
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Request small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        // With batch_size=1 and 4 rows, we expect multiple batches, but the
+        // exact number depends on both the batch_size setting and the Parquet
+        // file's internal row group structure.
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 4, "Total rows should match expected count");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_partition_filters() -> Result<()> {
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new().with_filters([
+            ("byteField", ">=", "10"),
+            ("byteField", "<", "20"),
+            ("shortField", "!=", "100"),
+        ]);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(
+            !batches.is_empty(),
+            "Should produce at least one batch for the given partition filters"
+        );
+        let schema = &batches[0].schema();
+        let records = concat_batches(schema, &batches)?;
+
+        let sample_data = SampleTable::sample_data_order_by_id(&records);
+        assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol", 
true),]);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_basic() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Get file slices first
+        let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+        assert!(
+            !file_slices.is_empty(),
+            "Should have at least one file slice"
+        );
+
+        let options = ReadOptions::new();
+        let file_slice = &file_slices[0];
+        let mut stream = hudi_table
+            .read_file_slice_stream(file_slice, &options)
+            .await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce at least one batch");
+
+        // Verify we got records
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert!(total_rows > 0, "Should read at least one row");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+        let file_slice = &file_slices[0];
+
+        // Test with small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table
+            .read_file_slice_stream(file_slice, &options)
+            .await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 4, "Should read all 4 rows");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_mor_with_log_files() -> Result<()> {
+        // Test MOR table with log files - should still work (falls back to 
collect+merge)
+        let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new();
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce batches from MOR table");
+
+        // Verify total row count
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 8, "Should have 8 rows (8 inserts)");
+        Ok(())
+    }

Review Comment:
   The test `test_read_snapshot_stream_mor_with_log_files` (line 725) verifies 
that MOR tables with log files work by falling back to collect-and-merge. 
However, the comment on line 726 says "falls back to collect+merge" but the 
test doesn't actually verify this behavior - it only checks that the result is 
correct. Consider adding assertions to verify that only a single batch is 
returned (indicating the fallback occurred) or add a comment explaining that 
this fallback behavior is tested implicitly.



##########
crates/core/src/file_group/reader.rs:
##########
@@ -425,6 +590,60 @@ impl FileGroupReader {
     }
 }
 
+/// Apply commit time filtering to a record batch.
+///
+/// This function mirrors the filtering logic in 
`FileGroupReader::create_filtering_mask_for_base_file_records`
+/// but takes `HudiConfigs` directly so it can be used in streaming contexts 
where `&self` is not available.
+fn apply_commit_time_filter(hudi_configs: &HudiConfigs, batch: RecordBatch) -> 
Result<RecordBatch> {
+    let populates_meta_fields: bool = hudi_configs
+        .get_or_default(HudiTableConfig::PopulatesMetaFields)
+        .into();
+    if !populates_meta_fields {
+        return Ok(batch);
+    }
+
+    let start_ts = hudi_configs
+        .try_get(HudiReadConfig::FileGroupStartTimestamp)
+        .map(|v| -> String { v.into() });
+    if start_ts.is_none() {
+        // If start timestamp is not provided, the query is snapshot or 
time-travel
+        return Ok(batch);
+    }
+
+    let mut and_filters: Vec<SchemableFilter> = Vec::new();
+    let schema = MetaField::schema();
+
+    if let Some(start) = start_ts {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), ">", 
start.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if let Some(end) = hudi_configs
+        .try_get(HudiReadConfig::FileGroupEndTimestamp)
+        .map(|v| -> String { v.into() })
+    {
+        let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=", 
end.as_str()))?;
+        and_filters.push(SchemableFilter::try_from((filter, 
schema.as_ref()))?);
+    }
+
+    if and_filters.is_empty() {
+        return Ok(batch);
+    }
+
+    let mut mask = BooleanArray::from(vec![true; batch.num_rows()]);
+    for filter in &and_filters {
+        let col_name = filter.field.name().as_str();
+        let col_values = batch
+            .column_by_name(col_name)
+            .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not 
found")))?;
+        let comparison = filter.apply_comparison(col_values)?;
+        mask = and(&mask, &comparison)?;
+    }
+

Review Comment:
   The `apply_commit_time_filter` function creates a new `BooleanArray` mask 
with all `true` values (line 633) even when no filtering is needed. This 
allocation could be avoided by checking if `and_filters` is empty before 
creating the mask. This is a minor optimization but could improve performance 
when processing many batches without commit time filters.
   ```suggestion
       let mut mask: Option<BooleanArray> = None;
       for filter in &and_filters {
           let col_name = filter.field.name().as_str();
           let col_values = batch
               .column_by_name(col_name)
               .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not 
found")))?;
           let comparison = filter.apply_comparison(col_values)?;
           mask = Some(match mask {
               None => comparison,
               Some(existing) => and(&existing, &comparison)?,
           });
       }
   
       let mask = match mask {
           Some(m) => m,
           None => return Ok(batch),
       };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to