Copilot commented on code in PR #508:
URL: https://github.com/apache/hudi-rs/pull/508#discussion_r2659445076
##########
crates/core/tests/table_read_tests.rs:
##########
@@ -555,6 +555,228 @@ mod v8_tables {
}
}
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+ use super::*;
+ use futures::StreamExt;
+ use hudi_core::table::ReadOptions;
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+ for base_url in SampleTable::V6Empty.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+ assert!(batches.is_empty(), "Empty table should produce no
batches");
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_basic() -> Result<()> {
+ for base_url in SampleTable::V6Nonpartitioned.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Concatenate batches and verify data
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![
+ (1, "Alice", false),
+ (2, "Bob", false),
+ (3, "Carol", true),
+ (4, "Diana", true),
+ ]
+ );
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Request small batch size
+ let options = ReadOptions::new().with_batch_size(1);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ // With batch_size=1 and 4 rows, we should get multiple batches
+ // (exact number depends on parquet row groups)
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 4, "Total rows should match expected count");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_partition_filters() -> Result<()> {
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new().with_filters([
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
+ ]);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ let schema = &batches[0].schema();
Review Comment:
The test accesses `batches[0]` without checking if the vector is non-empty.
While this follows the existing pattern in this test file (see lines 53, 86,
107, etc.), consider adding an assertion like `assert!(!batches.is_empty())`
before this line for better test clarity and error messages, especially since
this is part of new streaming API tests where behavior might differ from the
existing non-streaming APIs.
##########
crates/core/src/storage/mod.rs:
##########
@@ -47,6 +49,62 @@ pub mod file_metadata;
pub mod reader;
pub mod util;
+/// Options for reading Parquet files with streaming.
+#[derive(Clone, Debug, Default)]
+pub struct ParquetReadOptions {
+ /// Target batch size (number of rows per batch).
+ pub batch_size: Option<usize>,
+ /// Column projection (indices of columns to read).
+ pub projection: Option<Vec<usize>>,
+}
+
+impl ParquetReadOptions {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = Some(batch_size);
+ self
+ }
+
+ pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+ self.projection = Some(projection);
+ self
+ }
+}
Review Comment:
The `ParquetReadOptions` struct duplicates some of the same configuration as
`ReadOptions` (batch_size, projection). While `ParquetReadOptions` is for
internal use at the storage layer and `ReadOptions` is the public API, this
duplication means manual mapping between the two. Consider whether
`ParquetReadOptions` could be derived directly from `ReadOptions` to avoid
potential inconsistencies and reduce the need for manual field-by-field mapping.
##########
crates/core/tests/table_read_tests.rs:
##########
@@ -555,6 +555,228 @@ mod v8_tables {
}
}
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+ use super::*;
+ use futures::StreamExt;
+ use hudi_core::table::ReadOptions;
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+ for base_url in SampleTable::V6Empty.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+ assert!(batches.is_empty(), "Empty table should produce no
batches");
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_basic() -> Result<()> {
+ for base_url in SampleTable::V6Nonpartitioned.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Concatenate batches and verify data
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![
+ (1, "Alice", false),
+ (2, "Bob", false),
+ (3, "Carol", true),
+ (4, "Diana", true),
+ ]
+ );
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Request small batch size
+ let options = ReadOptions::new().with_batch_size(1);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ // With batch_size=1 and 4 rows, we should get multiple batches
+ // (exact number depends on parquet row groups)
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 4, "Total rows should match expected count");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_partition_filters() -> Result<()> {
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new().with_filters([
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
+ ]);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol",
true),]);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_basic() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Get file slices first
+ let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+ assert!(
+ !file_slices.is_empty(),
+ "Should have at least one file slice"
+ );
+
+ let options = ReadOptions::new();
+ let file_slice = &file_slices[0];
+ let mut stream = hudi_table
+ .read_file_slice_stream(file_slice, &options)
+ .await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Verify we got records
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert!(total_rows > 0, "Should read at least one row");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+ let file_slice = &file_slices[0];
+
+ // Test with small batch size
+ let options = ReadOptions::new().with_batch_size(1);
+ let mut stream = hudi_table
+ .read_file_slice_stream(file_slice, &options)
+ .await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 4, "Should read all 4 rows");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_mor_with_log_files() -> Result<()> {
+ // Test MOR table with log files - should still work (falls back to
collect+merge)
+ let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce batches from MOR table");
+
+ // Verify total row count
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 8, "Should have 8 rows (8 inserts)");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_error_propagation() -> Result<()> {
+ // This test verifies that if we read from a valid table, no errors
are propagated
+ // (We can't easily trigger file read errors in a unit test without
mocking)
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // All reads should succeed without error
+ while let Some(result) = stream.next().await {
+ assert!(result.is_ok(), "Reading should not produce errors");
+ }
+ Ok(())
+ }
Review Comment:
The test `test_read_snapshot_stream_error_propagation` doesn't actually test
error propagation - it only verifies that reading a valid table doesn't produce
errors. As noted in the comment, it can't easily trigger file read errors
without mocking. This test name is misleading. Consider renaming it to
something like `test_read_snapshot_stream_successful_read` or implementing
actual error propagation testing using integration test fixtures with invalid
data.
##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,157 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
todo!("read_incremental_changes")
}
+
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of reading a single file slice. It
returns a stream
+ /// that yields record batches as they are read, without loading all data
into memory.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let file_slices = table.get_file_slices(empty_filters()).await?;
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ ///
+ /// for file_slice in &file_slices {
+ /// let mut stream = table.read_file_slice_stream(file_slice,
&options).await?;
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+ let timestamp = options
+ .as_of_timestamp
+ .as_deref()
+ .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+ let timestamp = match timestamp {
+ Some(ts) => ts.to_string(),
Review Comment:
Calling `.to_string()` on an `Option<&str>` that's already a reference is
inefficient. The `timestamp` is immediately converted to string only to be
passed as `&str` to the file group reader. Consider avoiding this allocation by
keeping it as `&str` throughout, or restructure to avoid the unnecessary
conversion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]