This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 44f4be20b4 Minor: Add doc example to RecordBatchStreamAdapter (#13725)
44f4be20b4 is described below
commit 44f4be20b4753f481caeea81f24ddc45a8821075
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Dec 12 09:04:42 2024 -0500
Minor: Add doc example to RecordBatchStreamAdapter (#13725)
* Minor: Add doc example to RecordBatchStreamAdapter
* Update datafusion/physical-plan/src/stream.rs
Co-authored-by: Berkay Şahin
<[email protected]>
---------
Co-authored-by: Berkay Şahin
<[email protected]>
---
datafusion/physical-plan/src/stream.rs | 27 +++++++++++++++++++++++++--
1 file changed, 25 insertions(+), 2 deletions(-)
diff --git a/datafusion/physical-plan/src/stream.rs
b/datafusion/physical-plan/src/stream.rs
index b3054299b7..a05b46d228 100644
--- a/datafusion/physical-plan/src/stream.rs
+++ b/datafusion/physical-plan/src/stream.rs
@@ -337,7 +337,9 @@ impl RecordBatchReceiverStream {
pin_project! {
/// Combines a [`Stream`] with a [`SchemaRef`] implementing
- /// [`RecordBatchStream`] for the combination
+ /// [`SendableRecordBatchStream`] for the combination
+ ///
+ /// See [`Self::new`] for an example
pub struct RecordBatchStreamAdapter<S> {
schema: SchemaRef,
@@ -347,7 +349,28 @@ pin_project! {
}
impl<S> RecordBatchStreamAdapter<S> {
- /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema
and stream
+ /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema
and stream.
+ ///
+ /// Note to create a [`SendableRecordBatchStream`] you pin the result
+ ///
+ /// # Example
+ /// ```
+ /// # use arrow::array::record_batch;
+ /// # use datafusion_execution::SendableRecordBatchStream;
+ /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+ /// // Create stream of Result<RecordBatch>
+ /// let batch = record_batch!(
+ /// ("a", Int32, [1, 2, 3]),
+ /// ("b", Float64, [Some(4.0), None, Some(5.0)])
+ /// ).expect("created batch");
+ /// let schema = batch.schema();
+ /// let stream = futures::stream::iter(vec![Ok(batch)]);
+ /// // Convert the stream to a SendableRecordBatchStream
+ /// let adapter = RecordBatchStreamAdapter::new(schema, stream);
+ /// // Now you can use the adapter as a SendableRecordBatchStream
+ /// let batch_stream: SendableRecordBatchStream = Box::pin(adapter);
+ /// // ...
+ /// ```
pub fn new(schema: SchemaRef, stream: S) -> Self {
Self { schema, stream }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]