Copilot commented on code in PR #508:
URL: https://github.com/apache/hudi-rs/pull/508#discussion_r2659409268


##########
crates/core/src/table/read_options.rs:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Read options for streaming reads.
+
+use arrow_array::{BooleanArray, RecordBatch};
+
+/// A row-level predicate function for filtering records.
+pub type RowPredicate = Box<dyn Fn(&RecordBatch) -> 
crate::Result<BooleanArray> + Send + Sync>;
+
+/// A partition filter tuple: (field_name, operator, value).
+/// Example: ("city", "=", "san_francisco")
+pub type PartitionFilter = (String, String, String);
+
+/// Options for reading file slices with streaming APIs.
+///
+/// This struct provides configuration for:
+/// - Partition filters (filtering partitions)
+/// - Column projection (which columns to read)
+/// - Row-level predicates (filtering rows)
+/// - Batch size control (rows per batch)
+/// - Time travel (as-of timestamp)
+///
+/// # Current Limitations
+///
+/// Not all options are supported in all streaming APIs:
+/// - `batch_size` and `partition_filters` are fully supported.
+/// - `projection` is passed through but not yet applied at the parquet read 
level.
+/// - `row_predicate` is not yet implemented in streaming reads.

Review Comment:
   The documentation states that `projection` is "passed through but not yet 
applied at the parquet read level", but looking at the implementation in 
`read_base_file_stream`, there's no code to apply the projection even when 
building the parquet reader. The `ParquetReadOptions` struct has a `projection` 
field that accepts column indices, but the `ReadOptions::projection` field is 
column names. This mismatch means projection cannot be easily passed through 
without a schema lookup to map column names to indices. Consider either 
implementing the projection functionality or removing the projection field from 
ReadOptions in this PR since it's not implemented.



##########
crates/core/tests/table_read_tests.rs:
##########
@@ -555,6 +555,228 @@ mod v8_tables {
     }
 }
 
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+    use super::*;
+    use futures::StreamExt;
+    use hudi_core::table::ReadOptions;
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+        for base_url in SampleTable::V6Empty.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+            assert!(batches.is_empty(), "Empty table should produce no 
batches");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_basic() -> Result<()> {
+        for base_url in SampleTable::V6Nonpartitioned.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce at least one batch");
+
+            // Concatenate batches and verify data
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", false),
+                    (2, "Bob", false),
+                    (3, "Carol", true),
+                    (4, "Diana", true),
+                ]
+            );
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Request small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        // With batch_size=1 and 4 rows, we should get multiple batches
+        // (exact number depends on parquet row groups)
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 4, "Total rows should match expected count");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_partition_filters() -> Result<()> {
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new().with_filters([
+            ("byteField", ">=", "10"),
+            ("byteField", "<", "20"),
+            ("shortField", "!=", "100"),
+        ]);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+

Review Comment:
   This test accesses `batches[0]` without checking if the batches vector is 
empty. If the partition filters exclude all data, this will panic with an index 
out of bounds error. Add a check to ensure batches is not empty before 
accessing the first element, similar to the pattern used in 
`test_read_snapshot_stream_basic`.
   ```suggestion
   
           assert!(
               !batches.is_empty(),
               "Should produce at least one batch for the given partition 
filters"
           );
   ```



##########
crates/core/src/table/read_options.rs:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Read options for streaming reads.
+
+use arrow_array::{BooleanArray, RecordBatch};
+
+/// A row-level predicate function for filtering records.
+pub type RowPredicate = Box<dyn Fn(&RecordBatch) -> 
crate::Result<BooleanArray> + Send + Sync>;
+
+/// A partition filter tuple: (field_name, operator, value).
+/// Example: ("city", "=", "san_francisco")
+pub type PartitionFilter = (String, String, String);
+
+/// Options for reading file slices with streaming APIs.
+///
+/// This struct provides configuration for:
+/// - Partition filters (filtering partitions)
+/// - Column projection (which columns to read)
+/// - Row-level predicates (filtering rows)
+/// - Batch size control (rows per batch)
+/// - Time travel (as-of timestamp)
+///
+/// # Current Limitations
+///
+/// Not all options are supported in all streaming APIs:
+/// - `batch_size` and `partition_filters` are fully supported.
+/// - `projection` is passed through but not yet applied at the parquet read 
level.
+/// - `row_predicate` is not yet implemented in streaming reads.
+///
+/// # Example
+///
+/// ```ignore
+/// use hudi::table::ReadOptions;
+///
+/// let options = ReadOptions::new()
+///     .with_filters([("city", "=", "san_francisco")])
+///     .with_batch_size(4096);
+/// ```
+#[derive(Default)]
+pub struct ReadOptions {
+    /// Partition filters. Each filter is a tuple of (field, operator, value).
+    pub partition_filters: Vec<PartitionFilter>,
+
+    /// Column names to project (select). If None, all columns are read.
+    pub projection: Option<Vec<String>>,
+
+    /// Row-level filter predicate. Applied after reading each batch.
+    pub row_predicate: Option<RowPredicate>,

Review Comment:
   The documentation claims that `row_predicate` is "not yet implemented in 
streaming reads", but this field is actually included in the `ReadOptions` 
struct and has an accessor method. Users may try to use this field expecting it 
to work. Consider either implementing row_predicate support or documenting more 
clearly in the struct-level docs and the `with_row_predicate` method doc that 
this feature is not yet supported in streaming contexts, to prevent confusion.



##########
crates/core/tests/table_read_tests.rs:
##########
@@ -555,6 +555,228 @@ mod v8_tables {
     }
 }
 
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+    use super::*;
+    use futures::StreamExt;
+    use hudi_core::table::ReadOptions;
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+        for base_url in SampleTable::V6Empty.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+            assert!(batches.is_empty(), "Empty table should produce no 
batches");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_basic() -> Result<()> {
+        for base_url in SampleTable::V6Nonpartitioned.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce at least one batch");
+
+            // Concatenate batches and verify data
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", false),
+                    (2, "Bob", false),
+                    (3, "Carol", true),
+                    (4, "Diana", true),
+                ]
+            );
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Request small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        // With batch_size=1 and 4 rows, we should get multiple batches
+        // (exact number depends on parquet row groups)
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 4, "Total rows should match expected count");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_partition_filters() -> Result<()> {
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new().with_filters([
+            ("byteField", ">=", "10"),
+            ("byteField", "<", "20"),
+            ("shortField", "!=", "100"),
+        ]);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        let schema = &batches[0].schema();
+        let records = concat_batches(schema, &batches)?;
+
+        let sample_data = SampleTable::sample_data_order_by_id(&records);
+        assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol", 
true),]);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_basic() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Get file slices first
+        let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+        assert!(
+            !file_slices.is_empty(),
+            "Should have at least one file slice"
+        );
+
+        let options = ReadOptions::new();
+        let file_slice = &file_slices[0];
+        let mut stream = hudi_table
+            .read_file_slice_stream(file_slice, &options)
+            .await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce at least one batch");
+
+        // Verify we got records
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert!(total_rows > 0, "Should read at least one row");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+        let file_slice = &file_slices[0];
+
+        // Test with small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table
+            .read_file_slice_stream(file_slice, &options)
+            .await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 4, "Should read all 4 rows");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_mor_with_log_files() -> Result<()> {
+        // Test MOR table with log files - should still work (falls back to 
collect+merge)
+        let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new();
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce batches from MOR table");
+
+        // Verify total row count
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 8, "Should have 8 rows (8 inserts)");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_error_propagation() -> Result<()> {
+        // This test verifies that if we read from a valid table, no errors 
are propagated
+        // (We can't easily trigger file read errors in a unit test without 
mocking)
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        let options = ReadOptions::new();
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // All reads should succeed without error
+        while let Some(result) = stream.next().await {
+            assert!(result.is_ok(), "Reading should not produce errors");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_slice_stream_no_timestamp_error() -> Result<()> {
+        // For an empty table, read_file_slice_stream should error if called 
with
+        // a file slice that doesn't have a valid timestamp context
+        let base_url = SampleTable::V6Empty.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Create a dummy file slice - this should fail because there's no 
commit timestamp
+        // Note: We can't easily test this without creating an invalid file 
slice
+        // So we verify the empty table returns empty stream from 
read_snapshot_stream
+        let options = ReadOptions::new();
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut count = 0;
+        while (stream.next().await).is_some() {
+            count += 1;
+        }
+        assert_eq!(count, 0, "Empty table should produce no batches");
+        Ok(())
+    }

Review Comment:
   The test name `test_read_file_slice_stream_no_timestamp_error` and its 
comment suggest it's testing error handling for file slices without valid 
timestamps. However, the test actually just verifies that an empty table 
produces no batches - it doesn't create or test an invalid file slice. Either 
rename the test to accurately reflect what it tests (e.g., 
`test_read_snapshot_stream_empty_table_produces_no_batches`) or implement the 
actual timestamp error test case as described in the comment.



##########
crates/core/tests/table_read_tests.rs:
##########
@@ -555,6 +555,228 @@ mod v8_tables {
     }
 }
 
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+    use super::*;
+    use futures::StreamExt;
+    use hudi_core::table::ReadOptions;
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+        for base_url in SampleTable::V6Empty.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+            assert!(batches.is_empty(), "Empty table should produce no 
batches");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_basic() -> Result<()> {
+        for base_url in SampleTable::V6Nonpartitioned.urls() {
+            let hudi_table = Table::new(base_url.path()).await?;
+            let options = ReadOptions::new();
+            let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+            // Collect all batches from stream
+            let mut batches = Vec::new();
+            while let Some(result) = stream.next().await {
+                batches.push(result?);
+            }
+
+            assert!(!batches.is_empty(), "Should produce at least one batch");
+
+            // Concatenate batches and verify data
+            let schema = &batches[0].schema();
+            let records = concat_batches(schema, &batches)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", false),
+                    (2, "Bob", false),
+                    (3, "Carol", true),
+                    (4, "Diana", true),
+                ]
+            );
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Request small batch size
+        let options = ReadOptions::new().with_batch_size(1);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Collect all batches from stream
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        // With batch_size=1 and 4 rows, we should get multiple batches
+        // (exact number depends on parquet row groups)

Review Comment:
   The comment states "the exact number depends on parquet row groups", but 
this is misleading. The batch size setting controls how many rows are returned 
per batch during streaming reads, but Parquet's internal row group structure 
will still be respected. With batch_size=1, the actual batch sizes may vary 
depending on how the Parquet reader handles row groups - it typically won't 
split a row group across batches. Consider clarifying this comment to explain 
that the exact number of batches depends on both the batch_size setting and the 
Parquet file's internal row group structure.
   ```suggestion
           // With batch_size=1 and 4 rows, we expect multiple batches, but the
           // exact number and boundaries depend on both the batch_size setting
           // and the Parquet file's internal row group structure.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to