Copilot commented on code in PR #508:
URL: https://github.com/apache/hudi-rs/pull/508#discussion_r2659311067
##########
crates/core/src/file_group/reader.rs:
##########
@@ -425,6 +604,54 @@ impl FileGroupReader {
}
}
+/// Helper function to create a commit time filter mask for a record batch.
+/// This is used by the streaming read APIs.
+fn create_commit_time_filter(
+ hudi_configs: &HudiConfigs,
+ records: &RecordBatch,
+) -> Result<Option<BooleanArray>> {
+ let mut and_filters: Vec<SchemableFilter> = Vec::new();
+ let schema = MetaField::schema();
+
+ if let Some(start) = hudi_configs
+ .try_get(HudiReadConfig::FileGroupStartTimestamp)
+ .map(|v| -> String { v.into() })
+ {
+ let filter: Filter =
+ Filter::try_from((MetaField::CommitTime.as_ref(), ">",
start.as_str()))?;
+ let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+ and_filters.push(filter);
+ } else {
+ // If start timestamp is not provided, no filtering needed
+ return Ok(None);
+ }
+
+ if let Some(end) = hudi_configs
+ .try_get(HudiReadConfig::FileGroupEndTimestamp)
+ .map(|v| -> String { v.into() })
+ {
+ let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=",
end.as_str()))?;
+ let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+ and_filters.push(filter);
+ }
+
+ if and_filters.is_empty() {
+ return Ok(None);
+ }
+
+ let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
+ for filter in &and_filters {
+ let col_name = filter.field.name().as_str();
+ let col_values = records
+ .column_by_name(col_name)
+ .ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not
found")))?;
+
+ let comparison = filter.apply_comparsion(col_values)?;
Review Comment:
Typo in method name: 'apply_comparsion' should be 'apply_comparison'. This
typo exists in the existing codebase (line 165) and is propagated here.
##########
crates/core/src/file_group/reader.rs:
##########
@@ -325,6 +327,183 @@ impl FileGroupReader {
.block_on(self.read_file_slice_from_paths(base_file_path,
log_file_paths))
}
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of [FileGroupReader::read_file_slice].
+ /// It returns a stream that yields record batches as they are read.
+ ///
+ /// For COW tables or read-optimized mode (base file only), this returns a
true
+ /// streaming iterator from the underlying parquet file, yielding batches
as they
+ /// are read without loading all data into memory.
+ ///
+ /// For MOR tables with log files, this falls back to the
collect-and-merge approach
+ /// and yields the merged result as a single batch.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches. The stream owns all necessary data and is
`'static`.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = reader.read_file_slice_stream(&file_slice,
&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ let base_file_path = file_slice.base_file_relative_path()?;
+ let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+ file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?
+ } else {
+ vec![]
+ };
+
+ self.read_file_slice_from_paths_stream(&base_file_path,
log_file_paths, options)
+ .await
+ }
+
+ /// Reads a file slice from paths as a stream of record batches.
+ ///
+ /// This is the streaming version of
[FileGroupReader::read_file_slice_from_paths].
+ ///
+ /// # Arguments
+ /// * `base_file_path` - Relative path to the base file.
+ /// * `log_file_paths` - Iterator of relative paths to log files.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ pub async fn read_file_slice_from_paths_stream<I, S>(
+ &self,
+ base_file_path: &str,
+ log_file_paths: I,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+ where
+ I: IntoIterator<Item = S>,
+ S: AsRef<str>,
+ {
+ let log_file_paths: Vec<String> = log_file_paths
+ .into_iter()
+ .map(|s| s.as_ref().to_string())
+ .collect();
+
+ let use_read_optimized: bool = self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+ .into();
+ let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+ if base_file_only {
+ // True streaming: return a stream from the parquet file
+ self.read_base_file_stream(base_file_path, options).await
+ } else {
+ // Fallback: collect + merge, then yield as single-item stream
+ let batch = self
+ .read_file_slice_from_paths(base_file_path, log_file_paths)
+ .await?;
+ Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+ }
Review Comment:
For MOR tables with log files, the streaming implementation falls back to
the non-streaming read_file_slice_from_paths method and returns the entire
merged result as a single batch. This defeats the purpose of streaming for MOR
tables and could cause high memory usage with large file slices. Consider
documenting this limitation prominently in the function documentation, or
implementing proper streaming merge support.
##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
todo!("read_incremental_changes")
}
+
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of reading a single file slice. It
returns a stream
+ /// that yields record batches as they are read, without loading all data
into memory.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let file_slices = table.get_file_slices(empty_filters()).await?;
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ ///
+ /// for file_slice in &file_slices {
+ /// let mut stream = table.read_file_slice_stream(file_slice,
&options).await?;
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+ let timestamp = options
+ .as_of_timestamp
+ .as_deref()
+ .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+ let timestamp = match timestamp {
+ Some(ts) => ts.to_string(),
+ None => return Err(crate::error::CoreError::Timeline("No commit
timestamp available".to_string())),
+ };
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp.as_str(),
+ )])?;
+
+ fg_reader.read_file_slice_stream(file_slice, options).await
+ }
+
+ /// Reads the table snapshot as a stream of record batches.
+ ///
+ /// This is the streaming version of [Table::read_snapshot]. Instead of
returning
+ /// all batches at once, it returns a stream that yields record batches as
they
+ /// are read from the underlying file slices.
+ ///
+ /// # Arguments
+ /// * `filters` - Partition filters to apply.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches from all file slices.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = table.read_snapshot_stream(empty_filters(),
&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// println!("Read {} rows", batch.num_rows());
+ /// }
+ /// ```
+ pub async fn read_snapshot_stream<I, S>(
+ &self,
+ filters: I,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ use futures::stream::{self, StreamExt};
+
+ let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
+ // Return empty stream if no commits
+ return Ok(Box::pin(stream::empty()));
+ };
Review Comment:
Inconsistent error handling compared to read_file_slice_stream. The
read_file_slice_stream function (line 841) returns an error when no commit
timestamp is available, but read_snapshot_stream (line 889-891) returns an
empty stream. This inconsistency could confuse API users about expected
behavior. Consider aligning these behaviors for consistency.
```suggestion
let timestamp = self
.timeline
.get_latest_commit_timestamp_as_option()
.ok_or_else(|| "No commit timestamp available for snapshot
read".into())?;
```
##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
todo!("read_incremental_changes")
}
+
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of reading a single file slice. It
returns a stream
+ /// that yields record batches as they are read, without loading all data
into memory.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let file_slices = table.get_file_slices(empty_filters()).await?;
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ ///
+ /// for file_slice in &file_slices {
+ /// let mut stream = table.read_file_slice_stream(file_slice,
&options).await?;
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+ let timestamp = options
+ .as_of_timestamp
+ .as_deref()
+ .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+ let timestamp = match timestamp {
+ Some(ts) => ts.to_string(),
+ None => return Err(crate::error::CoreError::Timeline("No commit
timestamp available".to_string())),
+ };
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp.as_str(),
+ )])?;
+
+ fg_reader.read_file_slice_stream(file_slice, options).await
+ }
+
+ /// Reads the table snapshot as a stream of record batches.
+ ///
+ /// This is the streaming version of [Table::read_snapshot]. Instead of
returning
+ /// all batches at once, it returns a stream that yields record batches as
they
+ /// are read from the underlying file slices.
+ ///
+ /// # Arguments
+ /// * `filters` - Partition filters to apply.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches from all file slices.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = table.read_snapshot_stream(empty_filters(),
&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// println!("Read {} rows", batch.num_rows());
+ /// }
+ /// ```
+ pub async fn read_snapshot_stream<I, S>(
+ &self,
+ filters: I,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ use futures::stream::{self, StreamExt};
+
+ let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
+ // Return empty stream if no commits
+ return Ok(Box::pin(stream::empty()));
+ };
+
+ let filters = from_str_tuples(filters)?;
+ let file_slices = self.get_file_slices_internal(timestamp,
&filters).await?;
+
+ if file_slices.is_empty() {
+ return Ok(Box::pin(stream::empty()));
+ }
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+
+ // Create a stream that iterates through file slices and yields batches
+ let batch_size = options.batch_size;
+ let streams_iter = file_slices.into_iter().map(move |file_slice| {
+ let fg_reader = fg_reader.clone();
+ let options = ReadOptions {
+ projection: None,
+ row_predicate: None,
+ batch_size,
+ as_of_timestamp: None,
+ };
Review Comment:
The projection and row_predicate options from the input ReadOptions are
being ignored. Only batch_size is extracted and used, while projection and
row_predicate are explicitly set to None when creating a new ReadOptions for
each file slice. This means users cannot apply column projections or row-level
filters when using read_snapshot_stream, which breaks the API contract
suggested by the ReadOptions parameter.
```suggestion
// Clone the caller-provided ReadOptions so that projection,
// row_predicate, as_of_timestamp, and batch_size are all
preserved.
let mut options = options.clone();
options.batch_size = batch_size;
```
##########
crates/core/src/file_group/reader.rs:
##########
@@ -325,6 +327,183 @@ impl FileGroupReader {
.block_on(self.read_file_slice_from_paths(base_file_path,
log_file_paths))
}
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of [FileGroupReader::read_file_slice].
+ /// It returns a stream that yields record batches as they are read.
+ ///
+ /// For COW tables or read-optimized mode (base file only), this returns a
true
+ /// streaming iterator from the underlying parquet file, yielding batches
as they
+ /// are read without loading all data into memory.
+ ///
+ /// For MOR tables with log files, this falls back to the
collect-and-merge approach
+ /// and yields the merged result as a single batch.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches. The stream owns all necessary data and is
`'static`.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = reader.read_file_slice_stream(&file_slice,
&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ let base_file_path = file_slice.base_file_relative_path()?;
+ let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+ file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?
+ } else {
+ vec![]
+ };
+
+ self.read_file_slice_from_paths_stream(&base_file_path,
log_file_paths, options)
+ .await
+ }
+
+ /// Reads a file slice from paths as a stream of record batches.
+ ///
+ /// This is the streaming version of
[FileGroupReader::read_file_slice_from_paths].
+ ///
+ /// # Arguments
+ /// * `base_file_path` - Relative path to the base file.
+ /// * `log_file_paths` - Iterator of relative paths to log files.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ pub async fn read_file_slice_from_paths_stream<I, S>(
+ &self,
+ base_file_path: &str,
+ log_file_paths: I,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>>
+ where
+ I: IntoIterator<Item = S>,
+ S: AsRef<str>,
+ {
+ let log_file_paths: Vec<String> = log_file_paths
+ .into_iter()
+ .map(|s| s.as_ref().to_string())
+ .collect();
+
+ let use_read_optimized: bool = self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+ .into();
+ let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+ if base_file_only {
+ // True streaming: return a stream from the parquet file
+ self.read_base_file_stream(base_file_path, options).await
+ } else {
+ // Fallback: collect + merge, then yield as single-item stream
+ let batch = self
+ .read_file_slice_from_paths(base_file_path, log_file_paths)
+ .await?;
+ Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+ }
+ }
+
+ /// Reads a base file as a stream of record batches.
+ ///
+ /// This method streams record batches directly from the parquet file
without
+ /// loading all data into memory at once.
+ async fn read_base_file_stream(
+ &self,
+ relative_path: &str,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ // Get default batch size from config
+ let default_batch_size: usize = self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::StreamBatchSize)
+ .into();
+
+ let batch_size = options.batch_size.unwrap_or(default_batch_size);
+
+ let parquet_options =
ParquetReadOptions::new().with_batch_size(batch_size);
+
+ let hudi_configs = self.hudi_configs.clone();
+ let path = relative_path.to_string();
+
+ // Get the parquet stream
+ let parquet_stream = self
+ .storage
+ .get_parquet_file_stream(&path, parquet_options)
+ .map_err(|e| ReadFileSliceError(format!("Failed to read path
{path}: {e:?}")))
+ .await?;
+
+ let populates_meta_fields: bool = hudi_configs
+ .get_or_default(HudiTableConfig::PopulatesMetaFields)
+ .into();
+
+ // Create a stream that applies commit time filtering
+ let stream = parquet_stream
+ .into_stream()
+ .filter_map(move |result| {
+ let hudi_configs = hudi_configs.clone();
+ async move {
+ match result {
+ Err(e) => Some(Err(ReadFileSliceError(format!(
+ "Failed to read batch: {e:?}"
+ )))),
+ Ok(batch) => {
+ // Apply commit time filtering if meta fields are
populated
+ let filtered = if populates_meta_fields {
+ match create_commit_time_filter(&hudi_configs,
&batch) {
+ Err(e) => return Some(Err(e)),
+ Ok(Some(mask)) => {
+ match filter_record_batch(&batch,
&mask) {
+ Ok(b) => b,
+ Err(e) => {
+ return
Some(Err(ReadFileSliceError(format!(
+ "Failed to filter records:
{e:?}"
+ ))))
+ }
+ }
+ }
+ Ok(None) => batch,
+ }
+ } else {
+ batch
+ };
+
+ // Only yield non-empty batches
+ if filtered.num_rows() > 0 {
+ Some(Ok(filtered))
+ } else {
+ None
+ }
+ }
+ }
+ }
+ });
+
+ Ok(Box::pin(stream))
+ }
Review Comment:
The projection and row_predicate fields in ReadOptions are defined but never
actually used in the streaming implementation. The read_base_file_stream
function only applies commit time filtering but doesn't handle column
projection (which would need to be passed to ParquetReadOptions.projection as
column indices) or row-level predicates (which would need to be applied after
commit time filtering). This makes these ReadOptions fields non-functional in
the streaming API.
##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
todo!("read_incremental_changes")
}
+
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of reading a single file slice. It
returns a stream
+ /// that yields record batches as they are read, without loading all data
into memory.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let file_slices = table.get_file_slices(empty_filters()).await?;
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ ///
+ /// for file_slice in &file_slices {
+ /// let mut stream = table.read_file_slice_stream(file_slice,
&options).await?;
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+ let timestamp = options
+ .as_of_timestamp
+ .as_deref()
+ .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+ let timestamp = match timestamp {
+ Some(ts) => ts.to_string(),
+ None => return Err(crate::error::CoreError::Timeline("No commit
timestamp available".to_string())),
+ };
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp.as_str(),
+ )])?;
+
+ fg_reader.read_file_slice_stream(file_slice, options).await
+ }
+
+ /// Reads the table snapshot as a stream of record batches.
+ ///
+ /// This is the streaming version of [Table::read_snapshot]. Instead of
returning
+ /// all batches at once, it returns a stream that yields record batches as
they
+ /// are read from the underlying file slices.
+ ///
+ /// # Arguments
+ /// * `filters` - Partition filters to apply.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches from all file slices.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = table.read_snapshot_stream(empty_filters(),
&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// println!("Read {} rows", batch.num_rows());
+ /// }
+ /// ```
+ pub async fn read_snapshot_stream<I, S>(
+ &self,
+ filters: I,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ use futures::stream::{self, StreamExt};
+
+ let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
+ // Return empty stream if no commits
+ return Ok(Box::pin(stream::empty()));
+ };
+
+ let filters = from_str_tuples(filters)?;
+ let file_slices = self.get_file_slices_internal(timestamp,
&filters).await?;
+
+ if file_slices.is_empty() {
+ return Ok(Box::pin(stream::empty()));
+ }
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+
+ // Create a stream that iterates through file slices and yields batches
+ let batch_size = options.batch_size;
+ let streams_iter = file_slices.into_iter().map(move |file_slice| {
+ let fg_reader = fg_reader.clone();
+ let options = ReadOptions {
+ projection: None,
+ row_predicate: None,
+ batch_size,
+ as_of_timestamp: None,
+ };
+ async move {
+ fg_reader.read_file_slice_stream(&file_slice, &options).await
+ }
+ });
+
+ // Chain all the streams together
+ let combined_stream = stream::iter(streams_iter)
+ .then(|fut| fut)
+ .filter_map(|result| async move {
+ match result {
+ Ok(stream) => Some(stream),
+ Err(e) => {
+ // Log error and skip this file slice
+ log::warn!("Failed to read file slice: {:?}", e);
+ None
Review Comment:
Errors from individual file slices are silently dropped with only a warning
logged. This could lead to incomplete or missing data without the caller being
aware. Consider either propagating the error to the stream (allowing the
consumer to decide how to handle it) or accumulating errors and returning them
at the end. If silent skipping is intentional for fault tolerance, this
behavior should be documented in the function's documentation.
```suggestion
.map(|result| {
match result {
Ok(stream) => stream,
Err(e) => {
// Log error and propagate it as an item in the
output stream
log::warn!("Failed to read file slice: {:?}", e);
Box::pin(stream::once(async move { Err(e) })) as _
```
##########
crates/core/src/table/mod.rs:
##########
@@ -790,6 +793,148 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
todo!("read_incremental_changes")
}
+
+ //
=========================================================================
+ // Streaming Read APIs
+ //
=========================================================================
+
+ /// Reads a file slice as a stream of record batches.
+ ///
+ /// This is the streaming version of reading a single file slice. It
returns a stream
+ /// that yields record batches as they are read, without loading all data
into memory.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let file_slices = table.get_file_slices(empty_filters()).await?;
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ ///
+ /// for file_slice in &file_slices {
+ /// let mut stream = table.read_file_slice_stream(file_slice,
&options).await?;
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// }
+ /// ```
+ pub async fn read_file_slice_stream(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
+ let timestamp = options
+ .as_of_timestamp
+ .as_deref()
+ .or_else(|| self.timeline.get_latest_commit_timestamp_as_option());
+
+ let timestamp = match timestamp {
+ Some(ts) => ts.to_string(),
+ None => return Err(crate::error::CoreError::Timeline("No commit
timestamp available".to_string())),
+ };
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp.as_str(),
+ )])?;
+
+ fg_reader.read_file_slice_stream(file_slice, options).await
+ }
+
+ /// Reads the table snapshot as a stream of record batches.
+ ///
+ /// This is the streaming version of [Table::read_snapshot]. Instead of
returning
+ /// all batches at once, it returns a stream that yields record batches as
they
+ /// are read from the underlying file slices.
+ ///
+ /// # Arguments
+ /// * `filters` - Partition filters to apply.
+ /// * `options` - Read options for configuring the read operation.
+ ///
+ /// # Returns
+ /// A stream of record batches from all file slices.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ /// use hudi::table::ReadOptions;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = table.read_snapshot_stream(empty_filters(),
&options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// println!("Read {} rows", batch.num_rows());
+ /// }
+ /// ```
+ pub async fn read_snapshot_stream<I, S>(
+ &self,
+ filters: I,
+ options: &ReadOptions,
+ ) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ use futures::stream::{self, StreamExt};
+
+ let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
+ // Return empty stream if no commits
+ return Ok(Box::pin(stream::empty()));
+ };
+
+ let filters = from_str_tuples(filters)?;
+ let file_slices = self.get_file_slices_internal(timestamp,
&filters).await?;
+
+ if file_slices.is_empty() {
+ return Ok(Box::pin(stream::empty()));
+ }
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+
+ // Create a stream that iterates through file slices and yields batches
+ let batch_size = options.batch_size;
+ let streams_iter = file_slices.into_iter().map(move |file_slice| {
+ let fg_reader = fg_reader.clone();
+ let options = ReadOptions {
+ projection: None,
+ row_predicate: None,
+ batch_size,
+ as_of_timestamp: None,
+ };
+ async move {
+ fg_reader.read_file_slice_stream(&file_slice, &options).await
+ }
+ });
+
+ // Chain all the streams together
+ let combined_stream = stream::iter(streams_iter)
+ .then(|fut| fut)
+ .filter_map(|result| async move {
+ match result {
+ Ok(stream) => Some(stream),
+ Err(e) => {
+ // Log error and skip this file slice
+ log::warn!("Failed to read file slice: {:?}", e);
+ None
+ }
+ }
+ })
+ .flatten();
+
+ Ok(Box::pin(combined_stream))
+ }
Review Comment:
The new streaming APIs (read_file_slice_stream and read_snapshot_stream)
lack test coverage. The codebase has comprehensive test coverage for similar
non-streaming read functions in crates/core/tests/table_read_tests.rs. Consider
adding tests for the streaming APIs that verify basic functionality, error
handling, and edge cases like empty results and missing timestamps.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]