This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a20771d feat: add streaming table and file group APIs (#508)
a20771d is described below
commit a20771d1dafe4fa9b0852360ab69311f425245cf
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 4 15:15:39 2026 -0600
feat: add streaming table and file group APIs (#508)
---
crates/core/src/config/read.rs | 11 +
crates/core/src/expr/filter.rs | 8 +-
crates/core/src/file_group/reader.rs | 634 ++++++++++++++++++++++++++++++----
crates/core/src/storage/mod.rs | 108 ++++++
crates/core/src/table/mod.rs | 152 ++++++++
crates/core/src/table/partition.rs | 2 +-
crates/core/src/table/read_options.rs | 211 +++++++++++
crates/core/tests/table_read_tests.rs | 426 +++++++++++++++++++++++
8 files changed, 1475 insertions(+), 77 deletions(-)
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 04c3296..a5acbc5 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -60,6 +60,10 @@ pub enum HudiReadConfig {
/// When set to true, only [BaseFile]s will be read for optimized reads.
/// This is only applicable to Merge-On-Read (MOR) tables.
UseReadOptimizedMode,
+
+ /// Target number of rows per batch for streaming reads.
+ /// This controls the batch size when using streaming APIs.
+ StreamBatchSize,
}
impl AsRef<str> for HudiReadConfig {
@@ -70,6 +74,7 @@ impl AsRef<str> for HudiReadConfig {
Self::InputPartitions => "hoodie.read.input.partitions",
Self::ListingParallelism => "hoodie.read.listing.parallelism",
Self::UseReadOptimizedMode =>
"hoodie.read.use.read_optimized.mode",
+ Self::StreamBatchSize => "hoodie.read.stream.batch_size",
}
}
}
@@ -88,6 +93,7 @@ impl ConfigParser for HudiReadConfig {
HudiReadConfig::InputPartitions =>
Some(HudiConfigValue::UInteger(0usize)),
HudiReadConfig::ListingParallelism =>
Some(HudiConfigValue::UInteger(10usize)),
HudiReadConfig::UseReadOptimizedMode =>
Some(HudiConfigValue::Boolean(false)),
+ HudiReadConfig::StreamBatchSize =>
Some(HudiConfigValue::UInteger(1024usize)),
_ => None,
}
}
@@ -120,6 +126,11 @@ impl ConfigParser for HudiReadConfig {
bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
+ Self::StreamBatchSize => get_result
+ .and_then(|v| {
+ usize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
+ })
+ .map(HudiConfigValue::UInteger),
}
}
}
diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 542d45f..060841f 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -193,7 +193,7 @@ impl SchemableFilter {
))
}
- pub fn apply_comparsion(&self, value: &dyn Datum) -> Result<BooleanArray> {
+ pub fn apply_comparison(&self, value: &dyn Datum) -> Result<BooleanArray> {
match self.operator {
ExprOperator::Eq => eq(value, &self.value),
ExprOperator::Ne => neq(value, &self.value),
@@ -289,7 +289,7 @@ mod tests {
let schemable = SchemableFilter::try_from((eq_filter, &schema))?;
let test_array = StringArray::from(vec!["test", "other", "test"]);
- let result = schemable.apply_comparsion(&test_array)?;
+ let result = schemable.apply_comparison(&test_array)?;
assert_eq!(result, BooleanArray::from(vec![true, false, true]));
// Test integer greater than comparison
@@ -301,7 +301,7 @@ mod tests {
let schemable = SchemableFilter::try_from((gt_filter, &schema))?;
let test_array = Int64Array::from(vec![40, 50, 60]);
- let result = schemable.apply_comparsion(&test_array)?;
+ let result = schemable.apply_comparison(&test_array)?;
assert_eq!(result, BooleanArray::from(vec![false, false, true]));
Ok(())
@@ -329,7 +329,7 @@ mod tests {
};
let schemable = SchemableFilter::try_from((filter, &schema))?;
- let result = schemable.apply_comparsion(&test_array)?;
+ let result = schemable.apply_comparison(&test_array)?;
assert_eq!(
result,
BooleanArray::from(expected),
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index defaf7d..005b48f 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -32,13 +32,15 @@ use crate::merge::record_merger::RecordMerger;
use crate::metadata::merger::FilesPartitionMerger;
use crate::metadata::meta_field::MetaField;
use crate::metadata::table_record::FilesPartitionRecord;
-use crate::storage::Storage;
+use crate::storage::{ParquetReadOptions, Storage};
+use crate::table::ReadOptions;
use crate::table::builder::OptionResolver;
use crate::timeline::selector::InstantRange;
use arrow::compute::and;
use arrow::compute::filter_record_batch;
use arrow_array::{BooleanArray, RecordBatch};
-use futures::TryFutureExt;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryFutureExt};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
@@ -108,64 +110,6 @@ impl FileGroupReader {
})
}
- fn create_filtering_mask_for_base_file_records(
- &self,
- records: &RecordBatch,
- ) -> Result<Option<BooleanArray>> {
- let populates_meta_fields: bool = self
- .hudi_configs
- .get_or_default(HudiTableConfig::PopulatesMetaFields)
- .into();
- if !populates_meta_fields {
- // If meta fields are not populated, commit time filtering is not
applicable.
- return Ok(None);
- }
-
- let mut and_filters: Vec<SchemableFilter> = Vec::new();
- let schema = MetaField::schema();
- if let Some(start) = self
- .hudi_configs
- .try_get(HudiReadConfig::FileGroupStartTimestamp)
- .map(|v| -> String { v.into() })
- {
- let filter: Filter =
- Filter::try_from((MetaField::CommitTime.as_ref(), ">",
start.as_str()))?;
- let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
- and_filters.push(filter);
- } else {
- // If start timestamp is not provided, the query is snapshot or
time-travel, so
- // commit time filtering is not needed as the base file being read
is already
- // filtered and selected by the timeline.
- return Ok(None);
- }
-
- if let Some(end) = self
- .hudi_configs
- .try_get(HudiReadConfig::FileGroupEndTimestamp)
- .map(|v| -> String { v.into() })
- {
- let filter = Filter::try_from((MetaField::CommitTime.as_ref(),
"<=", end.as_str()))?;
- let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
- and_filters.push(filter);
- }
-
- if and_filters.is_empty() {
- return Ok(None);
- }
-
- let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
- for filter in &and_filters {
- let col_name = filter.field.name().as_str();
- let col_values = records
- .column_by_name(col_name)
- .ok_or_else(|| ReadFileSliceError(format!("Column {col_name}
not found")))?;
-
- let comparison = filter.apply_comparsion(col_values)?;
- mask = and(&mask, &comparison)?;
- }
- Ok(Some(mask))
- }
-
/// Reads the data from the base file at the given relative path.
///
/// # Arguments
@@ -183,12 +127,7 @@ impl FileGroupReader {
.map_err(|e| ReadFileSliceError(format!("Failed to read path
{relative_path}: {e:?}")))
.await?;
- if let Some(mask) =
self.create_filtering_mask_for_base_file_records(&records)? {
- filter_record_batch(&records, &mask)
- .map_err(|e| ReadFileSliceError(format!("Failed to filter
records: {e:?}")))
- } else {
- Ok(records)
- }
+ apply_commit_time_filter(&self.hudi_configs, records)
}
/// Same as [FileGroupReader::read_file_slice_by_base_file_path], but
blocking.
@@ -325,6 +264,173 @@ impl FileGroupReader {
.block_on(self.read_file_slice_from_paths(base_file_path,
log_file_paths))
}
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of [FileGroupReader::read_file_slice].
+ /// It returns a stream that yields record batches as they are read.
+ ///
+ /// For COW tables or read-optimized mode (base file only), this returns a
true
+ /// streaming iterator from the underlying parquet file, yielding batches
as they
+ /// are read without loading all data into memory.
+ ///
+ /// For MOR tables with log files, this falls back to the
collect-and-merge approach
+ /// and yields the merged result as a single batch. This limitation exists
because
+ /// streaming merge of base files with log files is not yet implemented.
+ ///
+ /// # Limitations
+ ///
+ /// - The `projection` and `row_predicate` fields in [ReadOptions] are not
yet
+ /// implemented for streaming reads. Only `batch_size` is currently
supported.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches. The stream owns all necessary data and is
`'static`.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = reader.read_file_slice_stream(&file_slice,
&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ let base_file_path = file_slice.base_file_relative_path()?;
+ let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+ file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?
+ } else {
+ vec![]
+ };
+
+ self.read_file_slice_from_paths_stream(&base_file_path,
log_file_paths, options)
+ .await
+ }
+
+ /// Reads a file slice from paths as a stream of record batches.
+ ///
+ /// This is the streaming version of
[FileGroupReader::read_file_slice_from_paths].
+ ///
+ /// # Arguments
+ /// * `base_file_path` - Relative path to the base file.
+ /// * `log_file_paths` - Iterator of relative paths to log files.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ pub async fn read_file_slice_from_paths_stream<I, S>(
+ &self,
+ base_file_path: &str,
+ log_file_paths: I,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+ where
+ I: IntoIterator<Item = S>,
+ S: AsRef<str>,
+ {
+ let use_read_optimized: bool = self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+ .into();
+
+ if use_read_optimized {
+ return self.read_base_file_stream(base_file_path, options).await;
+ }
+
+ let log_file_paths: Vec<String> = log_file_paths
+ .into_iter()
+ .map(|s| s.as_ref().to_string())
+ .collect();
+
+ if log_file_paths.is_empty() {
+ self.read_base_file_stream(base_file_path, options).await
+ } else {
+ // Fallback: collect + merge, then yield as single-item stream
+ let batch = self
+ .read_file_slice_from_paths(base_file_path, log_file_paths)
+ .await?;
+ Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+ }
+ }
+
+ /// Reads a base file as a stream of record batches.
+ ///
+ /// # Limitations
+ ///
+ /// Currently only `batch_size` from [ReadOptions] is used. The
`projection` and
+ /// `row_predicate` fields are not yet implemented.
+ async fn read_base_file_stream(
+ &self,
+ relative_path: &str,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ use crate::config::table::BaseFileFormatValue;
+
+ // Validate base file format is parquet
+ let base_file_format: String = self
+ .hudi_configs
+ .get_or_default(HudiTableConfig::BaseFileFormat)
+ .into();
+ if
!base_file_format.eq_ignore_ascii_case(BaseFileFormatValue::Parquet.as_ref()) {
+ return Err(ReadFileSliceError(format!(
+ "Streaming read only supports parquet format, got:
{base_file_format}"
+ )));
+ }
+
+ let default_batch_size: usize = self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::StreamBatchSize)
+ .into();
+ let batch_size = options.batch_size.unwrap_or(default_batch_size);
+ let parquet_options =
ParquetReadOptions::new().with_batch_size(batch_size);
+
+ let hudi_configs = self.hudi_configs.clone();
+ let path = relative_path.to_string();
+
+ let parquet_stream = self
+ .storage
+ .get_parquet_file_stream(&path, parquet_options)
+ .map_err(|e| ReadFileSliceError(format!("Failed to read path
{path}: {e:?}")))
+ .await?;
+
+ // Apply the same filtering logic as read_file_slice_by_base_file_path
+ let stream = parquet_stream.into_stream().filter_map(move |result| {
+ let hudi_configs = hudi_configs.clone();
+ async move {
+ match result {
+ Err(e) => Some(Err(ReadFileSliceError(format!(
+ "Failed to read batch: {e:?}"
+ )))),
+ Ok(batch) => match apply_commit_time_filter(&hudi_configs,
batch) {
+ Err(e) => Some(Err(e)),
+ Ok(filtered) if filtered.num_rows() > 0 =>
Some(Ok(filtered)),
+ Ok(_) => None,
+ },
+ }
+ }
+ });
+
+ Ok(Box::pin(stream))
+ }
+
//
=========================================================================
// Metadata Table File Slice Reading
//
=========================================================================
@@ -425,6 +531,70 @@ impl FileGroupReader {
}
}
+/// Creates a commit time filtering mask based on the provided configs.
+///
+/// Returns `None` if no filtering is needed (meta fields disabled or no start
timestamp).
+fn create_commit_time_filter_mask(
+ hudi_configs: &HudiConfigs,
+ batch: &RecordBatch,
+) -> Result<Option<BooleanArray>> {
+ let populates_meta_fields: bool = hudi_configs
+ .get_or_default(HudiTableConfig::PopulatesMetaFields)
+ .into();
+ if !populates_meta_fields {
+ return Ok(None);
+ }
+
+ let start_ts = hudi_configs
+ .try_get(HudiReadConfig::FileGroupStartTimestamp)
+ .map(|v| -> String { v.into() });
+ if start_ts.is_none() {
+ // If start timestamp is not provided, the query is snapshot or
time-travel
+ return Ok(None);
+ }
+
+ let mut and_filters: Vec<SchemableFilter> = Vec::new();
+ let schema = MetaField::schema();
+
+ if let Some(start) = start_ts {
+ let filter = Filter::try_from((MetaField::CommitTime.as_ref(), ">",
start.as_str()))?;
+ and_filters.push(SchemableFilter::try_from((filter,
schema.as_ref()))?);
+ }
+
+ if let Some(end) = hudi_configs
+ .try_get(HudiReadConfig::FileGroupEndTimestamp)
+ .map(|v| -> String { v.into() })
+ {
+ let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=",
end.as_str()))?;
+ and_filters.push(SchemableFilter::try_from((filter,
schema.as_ref()))?);
+ }
+
+ if and_filters.is_empty() {
+ return Ok(None);
+ }
+
+ let mut mask = BooleanArray::from(vec![true; batch.num_rows()]);
+ for filter in &and_filters {
+ let col_name = filter.field.name().as_str();
+ let col_values = batch
+ .column_by_name(col_name)
+ .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not
found")))?;
+ let comparison = filter.apply_comparison(col_values)?;
+ mask = and(&mask, &comparison)?;
+ }
+
+ Ok(Some(mask))
+}
+
+/// Apply commit time filtering to a record batch.
+fn apply_commit_time_filter(hudi_configs: &HudiConfigs, batch: RecordBatch) ->
Result<RecordBatch> {
+ match create_commit_time_filter_mask(hudi_configs, &batch)? {
+ Some(mask) => filter_record_batch(&batch, &mask)
+ .map_err(|e| ReadFileSliceError(format!("Failed to filter records:
{e:?}"))),
+ None => Ok(batch),
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -533,7 +703,7 @@ mod tests {
}
#[test]
- fn test_create_filtering_mask_for_base_file_records() -> Result<()> {
+ fn test_create_commit_time_filter_mask() -> Result<()> {
let base_uri = get_base_uri_with_valid_props_minimum();
let records = create_test_record_batch()?;
@@ -545,12 +715,12 @@ mod tests {
(HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
],
)?;
- let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(mask, None, "Commit time filtering should not be needed");
// Test case 2: No commit time filtering options
let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
- let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(mask, None);
// Test case 3: Filtering commit time > '2'
@@ -558,7 +728,7 @@ mod tests {
&base_uri,
[(HudiReadConfig::FileGroupStartTimestamp, "2")],
)?;
- let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(
mask,
Some(BooleanArray::from(vec![false, false, true, true, true])),
@@ -570,7 +740,7 @@ mod tests {
&base_uri,
[(HudiReadConfig::FileGroupEndTimestamp, "4")],
)?;
- let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(mask, None, "Commit time filtering should not be needed");
// Test case 5: Filtering commit time > '2' and <= '4'
@@ -581,7 +751,7 @@ mod tests {
(HudiReadConfig::FileGroupEndTimestamp, "4"),
],
)?;
- let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(
mask,
Some(BooleanArray::from(vec![false, false, true, true, false])),
@@ -733,6 +903,326 @@ mod tests {
Ok(())
}
+ //
=========================================================================
+ // Streaming API Tests
+ //
=========================================================================
+
+ /// Helper to create a FileGroupReader without using block_on (safe for
async tests).
+ fn create_test_reader(base_uri: &str) -> Result<FileGroupReader> {
+ let hudi_configs =
Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_uri)]));
+
FileGroupReader::new_with_configs_and_overwriting_options(hudi_configs,
empty_options())
+ }
+
+ /// Helper to create a FileGroupReader with read-optimized mode.
+ fn create_test_reader_read_optimized(base_uri: &str) ->
Result<FileGroupReader> {
+ let hudi_configs =
Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_uri)]));
+ FileGroupReader::new_with_configs_and_overwriting_options(
+ hudi_configs,
+ [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
+ )
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_base_file_only() -> Result<()> {
+ use futures::StreamExt;
+
+ let base_uri = get_base_uri_with_valid_props_minimum();
+ let reader = create_test_reader(&base_uri)?;
+
+ let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
+ let file_slice = FileSlice::new(base_file, String::new());
+
+ let options = ReadOptions::default();
+ let result = reader.read_file_slice_stream(&file_slice,
&options).await;
+
+ match result {
+ Ok(mut stream) => {
+ let mut batches = Vec::new();
+ while let Some(batch_result) = stream.next().await {
+ batches.push(batch_result?);
+ }
+ // Should have read some batches
+ assert!(!batches.is_empty(), "Should produce at least one
batch");
+ let total_rows: usize = batches.iter().map(|b|
b.num_rows()).sum();
+ assert!(total_rows > 0, "Should read at least one row");
+ }
+ Err(e) => {
+ // Expected for missing test data
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("Failed to read path")
+ || error_msg.contains("not found")
+ || error_msg.contains("No such file"),
+ "Expected file not found error, got: {error_msg}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_from_paths_stream_base_file_only() ->
Result<()> {
+ use futures::StreamExt;
+
+ let base_uri = get_base_uri_with_valid_props_minimum();
+ let reader = create_test_reader(&base_uri)?;
+
+ let base_file_path = TEST_SAMPLE_BASE_FILE;
+ let log_file_paths: Vec<&str> = vec![];
+ let options = ReadOptions::default();
+
+ let result = reader
+ .read_file_slice_from_paths_stream(base_file_path, log_file_paths,
&options)
+ .await;
+
+ match result {
+ Ok(mut stream) => {
+ let mut batches = Vec::new();
+ while let Some(batch_result) = stream.next().await {
+ batches.push(batch_result?);
+ }
+ assert!(!batches.is_empty(), "Should produce at least one
batch");
+ }
+ Err(e) => {
+ // Expected for missing test data
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("Failed to read path") ||
error_msg.contains("not found"),
+ "Expected file not found error, got: {error_msg}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_from_paths_stream_read_optimized_mode() ->
Result<()> {
+ use futures::StreamExt;
+
+ let base_uri = get_base_uri_with_valid_props_minimum();
+ let reader = create_test_reader_read_optimized(&base_uri)?;
+
+ let base_file_path = TEST_SAMPLE_BASE_FILE;
+ // Even with log files, read-optimized mode should ignore them
+ let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
+ let options = ReadOptions::default();
+
+ let result = reader
+ .read_file_slice_from_paths_stream(base_file_path, log_file_paths,
&options)
+ .await;
+
+ match result {
+ Ok(mut stream) => {
+ let mut batches = Vec::new();
+ while let Some(batch_result) = stream.next().await {
+ batches.push(batch_result?);
+ }
+ // In read-optimized mode, log files are ignored - should
still work
+ assert!(
+ !batches.is_empty(),
+ "Should produce batches in read-optimized mode"
+ );
+ }
+ Err(e) => {
+ // Expected for missing test data
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("Failed to read path")
+ || error_msg.contains("not found")
+ || error_msg.contains("No such file"),
+ "Expected file not found error, got: {error_msg}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_from_paths_stream_with_log_files() ->
Result<()> {
+ use futures::StreamExt;
+
+ let base_uri = get_base_uri_with_valid_props_minimum();
+ let reader = create_test_reader(&base_uri)?;
+
+ let base_file_path = TEST_SAMPLE_BASE_FILE;
+ let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
+ let options = ReadOptions::default();
+
+ let result = reader
+ .read_file_slice_from_paths_stream(base_file_path, log_file_paths,
&options)
+ .await;
+
+ match result {
+ Ok(mut stream) => {
+ // With log files, falls back to collect+merge and yields
single batch
+ let mut batches = Vec::new();
+ while let Some(batch_result) = stream.next().await {
+ batches.push(batch_result?);
+ }
+ // Should have exactly one batch (the merged result from
fallback path)
+ assert_eq!(
+ batches.len(),
+ 1,
+ "Should produce exactly one batch in fallback mode"
+ );
+ }
+ Err(e) => {
+ // Expected for missing test data
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("Failed to read path")
+ || error_msg.contains("not found")
+ || error_msg.contains("No such file"),
+ "Expected file not found error, got: {error_msg}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+ use futures::StreamExt;
+
+ let base_uri = get_base_uri_with_valid_props_minimum();
+ let reader = create_test_reader(&base_uri)?;
+
+ let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
+ let file_slice = FileSlice::new(base_file, String::new());
+
+ // Use very small batch size
+ let options = ReadOptions {
+ partition_filters: vec![],
+ projection: None,
+ row_predicate: None,
+ batch_size: Some(1),
+ as_of_timestamp: None,
+ };
+
+ let result = reader.read_file_slice_stream(&file_slice,
&options).await;
+
+ match result {
+ Ok(mut stream) => {
+ let mut batches = Vec::new();
+ while let Some(batch_result) = stream.next().await {
+ batches.push(batch_result?);
+ }
+ // With small batch size, should get multiple batches
+ let total_rows: usize = batches.iter().map(|b|
b.num_rows()).sum();
+ assert!(total_rows > 0, "Should read at least one row");
+ }
+ Err(e) => {
+ // Expected for missing test data
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("Failed to read path")
+ || error_msg.contains("not found")
+ || error_msg.contains("No such file"),
+ "Expected file not found error, got: {error_msg}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_error_on_invalid_file() -> Result<()>
{
+ let base_uri = get_base_uri_with_valid_props_minimum();
+ let reader = create_test_reader(&base_uri)?;
+
+ // Use a valid file name format but pointing to a non-existent file
+ let base_file = BaseFile::from_str(
+
"00000000-0000-0000-0000-000000000000-0_0-0-0_00000000000000000.parquet",
+ )?;
+ let file_slice = FileSlice::new(base_file, String::new());
+
+ let options = ReadOptions::default();
+ let result = reader.read_file_slice_stream(&file_slice,
&options).await;
+
+ // Should return error for non-existent file
+ match result {
+ Ok(_) => panic!("Should return error for non-existent file"),
+ Err(e) => {
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("Failed to read path")
+ || error_msg.contains("not found")
+ || error_msg.contains("No such file")
+ || error_msg.contains("Object at location"),
+ "Expected file not found error, got: {error_msg}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Helper to create a FileGroupReader with commit time filtering options.
+ fn create_test_reader_with_commit_time_filter(base_uri: &str) ->
Result<FileGroupReader> {
+ let hudi_configs =
Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_uri)]));
+ FileGroupReader::new_with_configs_and_overwriting_options(
+ hudi_configs,
+ [
+ (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
+ (HudiReadConfig::FileGroupEndTimestamp.as_ref(), "4"),
+ ],
+ )
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_with_commit_time_filtering() ->
Result<()> {
+ use futures::StreamExt;
+
+ let base_uri = get_base_uri_with_valid_props_minimum();
+
+ // Create reader with commit time filtering options
+ let reader = create_test_reader_with_commit_time_filter(&base_uri)?;
+
+ let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
+ let file_slice = FileSlice::new(base_file, String::new());
+ let options = ReadOptions::default();
+
+ let result = reader.read_file_slice_stream(&file_slice,
&options).await;
+
+ match result {
+ Ok(mut stream) => {
+ // Collect all batches and verify commit time filtering was
applied
+ let mut batches = Vec::new();
+ while let Some(batch_result) = stream.next().await {
+ batches.push(batch_result?);
+ }
+
+ // Verify streaming with commit time filtering completed
successfully.
+ // The commit time filtering is applied via
apply_commit_time_filter
+ // in read_base_file_stream for each batch.
+ let total_rows: usize = batches.iter().map(|b|
b.num_rows()).sum();
+ // Just verify we can process all batches - exact count
depends on test data
+ assert!(
+ batches.is_empty() || total_rows > 0,
+ "Non-empty batches should have rows"
+ );
+ }
+ Err(e) => {
+ // Expected for missing test data - verify error is
file-related
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("Failed to read path")
+ || error_msg.contains("not found")
+ || error_msg.contains("No such file")
+ || error_msg.contains("Object at location"),
+ "Expected file not found error, got: {error_msg}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
//
=========================================================================
// Metadata Table File Slice Reading Tests
//
=========================================================================
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 5f4b019..14af978 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -24,9 +24,11 @@ use std::sync::Arc;
use arrow::compute::concat_batches;
use arrow::record_batch::RecordBatch;
+use arrow_schema::SchemaRef;
use async_recursion::async_recursion;
use bytes::Bytes;
use futures::StreamExt;
+use futures::stream::BoxStream;
use object_store::path::Path as ObjPath;
use object_store::{ObjectStore, parse_url_opts};
use parquet::arrow::async_reader::ParquetObjectReader;
@@ -47,6 +49,70 @@ pub mod file_metadata;
pub mod reader;
pub mod util;
+/// Options for reading Parquet files with streaming.
+#[derive(Clone, Debug, Default)]
+pub struct ParquetReadOptions {
+ /// Target batch size (number of rows per batch).
+ pub batch_size: Option<usize>,
+ /// Column projection (indices of columns to read).
+ pub projection: Option<Vec<usize>>,
+}
+
+impl ParquetReadOptions {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = Some(batch_size);
+ self
+ }
+
+ pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+ self.projection = Some(projection);
+ self
+ }
+}
+
+/// A stream of record batches from a Parquet file with its schema.
+///
+/// # Implementation Note
+///
+/// This struct uses a `BoxStream` internally, which requires a heap
allocation per file.
+/// For high-performance scenarios with many small files, this adds minimal
overhead since:
+/// - Batch processing amortizes the allocation cost (each batch may contain
thousands of rows)
+/// - The streaming benefit (lazy evaluation, reduced memory) outweighs the
Box allocation cost
+/// - For typical parquet files (>10MB), the Box overhead (~40-80 bytes) is
negligible
+pub struct ParquetFileStream {
+ schema: SchemaRef,
+ stream: BoxStream<'static, std::result::Result<RecordBatch,
parquet::errors::ParquetError>>,
+}
+
+impl ParquetFileStream {
+ /// Returns the Arrow schema of the Parquet file.
+ pub fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ /// Consumes self and returns the inner stream.
+ pub fn into_stream(
+ self,
+ ) -> BoxStream<'static, std::result::Result<RecordBatch,
parquet::errors::ParquetError>> {
+ self.stream
+ }
+}
+
+impl futures::Stream for ParquetFileStream {
+ type Item = std::result::Result<RecordBatch,
parquet::errors::ParquetError>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.stream.as_mut().poll_next(cx)
+ }
+}
+
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct Storage {
@@ -206,6 +272,48 @@ impl Storage {
Ok(concat_batches(&schema, &batches)?)
}
+ /// Get a streaming reader for a Parquet file.
+ ///
+ /// Returns a [ParquetFileStream] that yields record batches as they are
read,
+ /// without loading all data into memory at once.
+ ///
+ /// # Arguments
+ /// * `relative_path` - The relative path to the Parquet file.
+ /// * `options` - Options for reading the Parquet file (batch size,
projection).
+ pub async fn get_parquet_file_stream(
+ &self,
+ relative_path: &str,
+ options: ParquetReadOptions,
+ ) -> Result<ParquetFileStream> {
+ let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+ let obj_path = ObjPath::from_url_path(obj_url.path())?;
+ let obj_store = self.object_store.clone();
+ let meta = obj_store.head(&obj_path).await?;
+
+ let reader = ParquetObjectReader::new(obj_store,
obj_path).with_file_size(meta.size);
+ let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+
+ if let Some(batch_size) = options.batch_size {
+ builder = builder.with_batch_size(batch_size);
+ }
+
+ if let Some(projection) = options.projection {
+ let projection_mask = parquet::arrow::ProjectionMask::roots(
+ builder.parquet_schema(),
+ projection.iter().copied(),
+ );
+ builder = builder.with_projection(projection_mask);
+ }
+
+ let schema = builder.schema().clone();
+ let stream = builder.build()?;
+
+ Ok(ParquetFileStream {
+ schema,
+ stream: Box::pin(stream),
+ })
+ }
+
pub async fn get_storage_reader(&self, relative_path: &str) ->
Result<StorageReader> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 80a00d7..b85c2b5 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -91,8 +91,11 @@ pub mod builder;
mod fs_view;
mod listing;
pub mod partition;
+mod read_options;
mod validation;
+pub use read_options::ReadOptions;
+
use crate::Result;
use crate::config::HudiConfigs;
use crate::config::read::HudiReadConfig;
@@ -790,6 +793,155 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
todo!("read_incremental_changes")
}
+
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of reading a single file slice. It
returns a stream
+ /// that yields record batches as they are read, without loading all data
into memory.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let file_slices = table.get_file_slices(empty_filters()).await?;
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ ///
+ /// for file_slice in &file_slices {
+ /// let mut stream = table.read_file_slice_stream(file_slice,
&options).await?;
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+ use futures::stream;
+
+ let timestamp = options
+ .as_of_timestamp
+ .as_deref()
+ .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+ let Some(timestamp) = timestamp else {
+ // No commit timestamp means empty table - return empty stream
(consistent with read_snapshot)
+ return Ok(Box::pin(stream::empty()));
+ };
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+
+ fg_reader.read_file_slice_stream(file_slice, options).await
+ }
+
+ /// Reads the table snapshot as a stream of record batches.
+ ///
+ /// This is the streaming version of [Table::read_snapshot]. Instead of
returning
+ /// all batches at once, it returns a stream that yields record batches as
they
+ /// are read from the underlying file slices.
+ ///
+ /// # Arguments
+ /// * `options` - Read options including partition filters, batch size,
and projection.
+ ///
+ /// # Returns
+ /// A stream of record batches from all file slices. Errors from
individual file
+ /// slices are propagated through the stream.
+ ///
+ /// # Limitations
+ ///
+ /// - The `row_predicate` field in [ReadOptions] is not yet implemented
for streaming reads.
+ /// - For MOR tables with log files, streaming falls back to the
collect-and-merge approach.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let options = ReadOptions::new()
+ /// .with_filters([("city", "=", "san_francisco")])
+ /// .with_batch_size(4096);
+ /// let mut stream = table.read_snapshot_stream(&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// println!("Read {} rows", batch.num_rows());
+ /// }
+ /// ```
+ pub async fn read_snapshot_stream(
+ &self,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+ use futures::stream::{self, StreamExt};
+
+ let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
+ return Ok(Box::pin(stream::empty()));
+ };
+
+ let filters: Vec<Filter> = options
+ .partition_filters
+ .iter()
+ .map(|(f, o, v)| Filter::try_from((f.as_str(), o.as_str(),
v.as_str())))
+ .collect::<Result<Vec<_>>>()?;
+ let file_slices = self.get_file_slices_internal(timestamp,
&filters).await?;
+
+ if file_slices.is_empty() {
+ return Ok(Box::pin(stream::empty()));
+ }
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+
+ // Extract options to pass to each file slice read.
+ // Note: row_predicate is not yet supported in streaming base file
reads.
+ let batch_size = options.batch_size;
+ let projection = options.projection.clone();
+
+ let streams_iter = file_slices.into_iter().map(move |file_slice| {
+ let fg_reader = fg_reader.clone();
+ let projection = projection.clone();
+ let options = ReadOptions {
+ partition_filters: vec![],
+ projection,
+ row_predicate: None, // Not yet implemented in streaming reads
+ batch_size,
+ as_of_timestamp: None,
+ };
+ async move {
+ fg_reader
+ .read_file_slice_stream(&file_slice, &options)
+ .await
+ }
+ });
+
+ // Chain all file slice streams together, propagating errors to the
caller.
+ let combined_stream = stream::iter(streams_iter)
+ .then(|fut| fut)
+ .flat_map(|result| match result {
+ Ok(file_stream) => file_stream.left_stream(),
+ Err(e) => stream::once(async move { Err(e) }).right_stream(),
+ });
+
+ Ok(Box::pin(combined_stream))
+ }
}
#[cfg(test)]
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 069dbde..79aa32d 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -118,7 +118,7 @@ impl PartitionPruner {
self.and_filters.iter().all(|filter| {
match segments.get(filter.field.name()) {
Some(segment_value) => {
- match filter.apply_comparsion(segment_value) {
+ match filter.apply_comparison(segment_value) {
Ok(scalar) => scalar.value(0),
Err(_) => true, // Include the partition when
comparison error occurs
}
diff --git a/crates/core/src/table/read_options.rs
b/crates/core/src/table/read_options.rs
new file mode 100644
index 0000000..499546d
--- /dev/null
+++ b/crates/core/src/table/read_options.rs
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Read options for streaming reads.
+
+use arrow_array::{BooleanArray, RecordBatch};
+
+/// A row-level predicate function for filtering records.
+pub type RowPredicate = Box<dyn Fn(&RecordBatch) ->
crate::Result<BooleanArray> + Send + Sync>;
+
+/// A partition filter tuple: (field_name, operator, value).
+/// Example: ("city", "=", "san_francisco")
+pub type PartitionFilter = (String, String, String);
+
+/// Options for reading file slices with streaming APIs.
+///
+/// This struct provides configuration for:
+/// - Partition filters (filtering partitions)
+/// - Column projection (which columns to read)
+/// - Row-level predicates (filtering rows)
+/// - Batch size control (rows per batch)
+/// - Time travel (as-of timestamp)
+///
+/// # Current Limitations
+///
+/// Not all options are fully supported in streaming APIs:
+/// - `batch_size` and `partition_filters` are fully supported.
+/// - `projection` is passed through to the streaming implementation but is
not yet
+/// applied at the parquet read level. This is because projection requires
mapping
+/// column names to column indices via schema lookup, which is not yet
implemented.
+/// - `row_predicate` is not yet implemented in streaming reads. The predicate
function
+/// is accepted but will be ignored.
+///
+/// # Example
+///
+/// ```ignore
+/// use hudi::table::ReadOptions;
+///
+/// let options = ReadOptions::new()
+/// .with_filters([("city", "=", "san_francisco")])
+/// .with_batch_size(4096);
+/// ```
+#[derive(Default)]
+pub struct ReadOptions {
+ /// Partition filters. Each filter is a tuple of (field, operator, value).
+ pub partition_filters: Vec<PartitionFilter>,
+
+ /// Column names to project (select). If None, all columns are read.
+ pub projection: Option<Vec<String>>,
+
+ /// Row-level filter predicate. Applied after reading each batch.
+ pub row_predicate: Option<RowPredicate>,
+
+ /// Target number of rows per batch.
+ pub batch_size: Option<usize>,
+
+ /// Timestamp for time travel queries (as-of).
+ pub as_of_timestamp: Option<String>,
+}
+
+impl ReadOptions {
+ /// Creates a new ReadOptions with default values.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Sets partition filters.
+ ///
+ /// # Arguments
+ /// * `filters` - Partition filters as tuples of (field, operator, value)
+ pub fn with_filters<I, S1, S2, S3>(mut self, filters: I) -> Self
+ where
+ I: IntoIterator<Item = (S1, S2, S3)>,
+ S1: Into<String>,
+ S2: Into<String>,
+ S3: Into<String>,
+ {
+ self.partition_filters = filters
+ .into_iter()
+ .map(|(f, o, v)| (f.into(), o.into(), v.into()))
+ .collect();
+ self
+ }
+
+ /// Sets the column projection (which columns to read).
+ ///
+ /// # Arguments
+ /// * `columns` - Column names to project
+ pub fn with_projection<I, S>(mut self, columns: I) -> Self
+ where
+ I: IntoIterator<Item = S>,
+ S: Into<String>,
+ {
+ self.projection = Some(columns.into_iter().map(|s|
s.into()).collect());
+ self
+ }
+
+ /// Sets the row-level predicate for filtering records.
+ ///
+ /// **Note:** Row predicates are not yet implemented in streaming reads.
+ /// The predicate will be accepted but ignored during streaming operations.
+ ///
+ /// # Arguments
+ /// * `predicate` - A function that takes a RecordBatch and returns a
BooleanArray mask
+ pub fn with_row_predicate<F>(mut self, predicate: F) -> Self
+ where
+ F: Fn(&RecordBatch) -> crate::Result<BooleanArray> + Send + Sync +
'static,
+ {
+ self.row_predicate = Some(Box::new(predicate));
+ self
+ }
+
+ /// Sets the target batch size (rows per batch).
+ ///
+ /// # Arguments
+ /// * `size` - Target number of rows per batch
+ pub fn with_batch_size(mut self, size: usize) -> Self {
+ self.batch_size = Some(size);
+ self
+ }
+
+ /// Sets the as-of timestamp for time travel queries.
+ ///
+ /// # Arguments
+ /// * `timestamp` - The timestamp to query as of
+ pub fn with_as_of_timestamp<S: Into<String>>(mut self, timestamp: S) ->
Self {
+ self.as_of_timestamp = Some(timestamp.into());
+ self
+ }
+}
+
+impl std::fmt::Debug for ReadOptions {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ReadOptions")
+ .field("filters", &self.partition_filters)
+ .field("projection", &self.projection)
+ .field(
+ "row_predicate",
+ &self.row_predicate.as_ref().map(|_| "<predicate>"),
+ )
+ .field("batch_size", &self.batch_size)
+ .field("as_of_timestamp", &self.as_of_timestamp)
+ .finish()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_with_projection() {
+ let options = ReadOptions::new().with_projection(["col1", "col2",
"col3"]);
+
+ assert_eq!(
+ options.projection,
+ Some(vec![
+ "col1".to_string(),
+ "col2".to_string(),
+ "col3".to_string()
+ ])
+ );
+ }
+
+ #[test]
+ fn test_with_row_predicate() {
+ let options = ReadOptions::new()
+ .with_row_predicate(|batch| Ok(BooleanArray::from(vec![true;
batch.num_rows()])));
+
+ assert!(options.row_predicate.is_some());
+ }
+
+ #[test]
+ fn test_with_as_of_timestamp() {
+ let options =
ReadOptions::new().with_as_of_timestamp("20240101120000000");
+
+ assert_eq!(
+ options.as_of_timestamp,
+ Some("20240101120000000".to_string())
+ );
+ }
+
+ #[test]
+ fn test_debug_format() {
+ let options = ReadOptions::new()
+ .with_filters([("city", "=", "sf")])
+ .with_projection(["id"])
+ .with_batch_size(1000);
+
+ let debug_str = format!("{options:?}");
+ assert!(debug_str.contains("ReadOptions"));
+ assert!(debug_str.contains("filters"));
+ assert!(debug_str.contains("projection"));
+ assert!(debug_str.contains("batch_size"));
+ }
+}
diff --git a/crates/core/tests/table_read_tests.rs
b/crates/core/tests/table_read_tests.rs
index 218f7b3..9b36a6d 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -553,6 +553,432 @@ mod v8_tables {
Ok(())
}
}
+
+ /// Streaming query tests for v8 tables
+ mod streaming_queries {
+ use super::*;
+ use futures::StreamExt;
+ use hudi_core::table::ReadOptions;
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+ let base_url = SampleTable::V8Empty.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+ assert!(batches.is_empty(), "Empty table should produce no
batches");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_basic() -> Result<()> {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Concatenate batches and verify data
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![
+ (1, "Alice", false),
+ (2, "Bob", false),
+ (3, "Carol", true),
+ (4, "Diana", true),
+ ]
+ );
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Request small batch size
+ let options = ReadOptions::new().with_batch_size(1);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ // With batch_size=1 and 4 rows, we expect multiple batches, but
the
+ // exact number depends on both the batch_size setting and the
Parquet
+ // file's internal row group structure.
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 4, "Total rows should match expected
count");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_partition_filters() ->
Result<()> {
+ let base_url = SampleTable::V8ComplexkeygenHivestyle.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new().with_filters([
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
+ ]);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(
+ !batches.is_empty(),
+ "Should produce at least one batch for the given partition
filters"
+ );
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol",
true),]);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_basic() -> Result<()> {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Get file slices first
+ let file_slices =
hudi_table.get_file_slices(empty_filters()).await?;
+ assert!(
+ !file_slices.is_empty(),
+ "Should have at least one file slice"
+ );
+
+ let options = ReadOptions::new();
+ let file_slice = &file_slices[0];
+ let mut stream = hudi_table
+ .read_file_slice_stream(file_slice, &options)
+ .await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Verify we got records
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert!(total_rows > 0, "Should read at least one row");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let file_slices =
hudi_table.get_file_slices(empty_filters()).await?;
+ let file_slice = &file_slices[0];
+
+ // Test with small batch size
+ let options = ReadOptions::new().with_batch_size(1);
+ let mut stream = hudi_table
+ .read_file_slice_stream(file_slice, &options)
+ .await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 4, "Should read all 4 rows");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_mor_with_log_files() -> Result<()> {
+ // Test MOR table with log files - should still work (falls back
to collect+merge)
+ // V8Trips8I3U1D: 8 inserts, 3 updates (A, J, G fare=0), 2 deletes
(F, J)
+ let base_url =
QuickstartTripsTable::V8Trips8I3U1D.url_to_mor_avro();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce batches from MOR
table");
+
+ // Verify total row count - should have 6 rows (8 inserts - 2
deletes)
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 6, "Should have 6 rows (8 inserts - 2
deleted)");
+
+ // Verify deleted riders are not present
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+ let uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records);
+ let riders: Vec<_> = uuid_rider_and_fare
+ .iter()
+ .map(|(_, rider, _)| rider.as_str())
+ .collect();
+
+ let deleted_riders = ["rider-F", "rider-J"];
+ assert!(
+ riders.iter().all(|rider| !deleted_riders.contains(rider)),
+ "Deleted riders should not be present in streaming results"
+ );
+
+ Ok(())
+ }
+ }
+}
+
+/// Test module for streaming read APIs.
+/// These tests verify the streaming versions of snapshot and file slice reads.
+mod streaming_queries {
+ use super::*;
+ use futures::StreamExt;
+ use hudi_core::table::ReadOptions;
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_empty_table() -> Result<()> {
+ for base_url in SampleTable::V6Empty.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+ assert!(batches.is_empty(), "Empty table should produce no
batches");
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_basic() -> Result<()> {
+ for base_url in SampleTable::V6Nonpartitioned.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Concatenate batches and verify data
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![
+ (1, "Alice", false),
+ (2, "Bob", false),
+ (3, "Carol", true),
+ (4, "Diana", true),
+ ]
+ );
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_batch_size() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Request small batch size
+ let options = ReadOptions::new().with_batch_size(1);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ // With batch_size=1 and 4 rows, we expect multiple batches, but the
+ // exact number depends on both the batch_size setting and the Parquet
+ // file's internal row group structure.
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 4, "Total rows should match expected count");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_partition_filters() -> Result<()> {
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new().with_filters([
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
+ ]);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(
+ !batches.is_empty(),
+ "Should produce at least one batch for the given partition filters"
+ );
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol",
true),]);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_basic() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Get file slices first
+ let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+ assert!(
+ !file_slices.is_empty(),
+ "Should have at least one file slice"
+ );
+
+ let options = ReadOptions::new();
+ let file_slice = &file_slices[0];
+ let mut stream = hudi_table
+ .read_file_slice_stream(file_slice, &options)
+ .await?;
+
+ // Collect all batches from stream
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Verify we got records
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert!(total_rows > 0, "Should read at least one row");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_slice_stream_with_batch_size() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let file_slices = hudi_table.get_file_slices(empty_filters()).await?;
+ let file_slice = &file_slices[0];
+
+ // Test with small batch size
+ let options = ReadOptions::new().with_batch_size(1);
+ let mut stream = hudi_table
+ .read_file_slice_stream(file_slice, &options)
+ .await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 4, "Should read all 4 rows");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_mor_with_log_files() -> Result<()> {
+ // Test MOR table with log files - should still work (falls back to
collect+merge)
+ let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce batches from MOR table");
+
+ // Verify total row count
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 8, "Should have 8 rows (8 inserts)");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_successful_read() -> Result<()> {
+ // This test verifies that reading from a valid table succeeds without
errors.
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // All reads should succeed without error
+ while let Some(result) = stream.next().await {
+ assert!(result.is_ok(), "Reading should not produce errors");
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_empty_table_no_timestamp() ->
Result<()> {
+ // For an empty table with no commit timestamp, streaming reads should
return
+ // an empty stream (consistent with read_snapshot behavior).
+ let base_url = SampleTable::V6Empty.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ let options = ReadOptions::new();
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ let mut count = 0;
+ while (stream.next().await).is_some() {
+ count += 1;
+ }
+ assert_eq!(count, 0, "Empty table should produce no batches");
+ Ok(())
+ }
}
/// Test module for tables with metadata table (MDT) enabled.