tustvold commented on code in PR #4046:
URL: https://github.com/apache/arrow-rs/pull/4046#discussion_r1162538577


##########
arrow-csv/src/reader/mod.rs:
##########
@@ -39,6 +41,84 @@
 //! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, 
None, None);
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
+//!
+//! # Async Usage
+//!
+//! The lower-level [`Decoder`] can be integrated with various forms of async 
data streams.
+//!
+//! For example, see below for how it can be used with an arbitrary `Stream` 
of `Bytes`
+//!
+//! ```
+//! # use std::task::{Poll, ready};
+//! # use bytes::{Buf, Bytes};
+//! # use arrow_schema::ArrowError;
+//! # use futures::stream::{Stream, StreamExt};
+//! # use arrow_array::RecordBatch;
+//! # use arrow_csv::reader::Decoder;
+//! #
+//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
+//!     mut decoder: Decoder,
+//!     mut input: S,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//!     let mut buffered = Bytes::new();
+//!     futures::stream::poll_fn(move |cx| {
+//!         loop {
+//!             if buffered.is_empty() {
+//!                 if let Some(b) = ready!(input.poll_next_unpin(cx)) {
+//!                     buffered = b;
+//!                 }
+//!             }
+//!             let decoded = match decoder.decode(buffered.as_ref()) {
+//!                 // Note: the decoder needs to be called with an empty
+//!                 // array to delimit the final record

Review Comment:
   The fallthrough of the branch above will end up here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to