alamb commented on code in PR #13412:
URL: https://github.com/apache/datafusion/pull/13412#discussion_r1844359572


##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -651,36 +651,14 @@ impl FileOpener for CsvOpener {
                     Ok(futures::stream::iter(config.open(decoder)?).boxed())
                 }
                 GetResultPayload::Stream(s) => {
-                    let mut decoder = config.builder().build_decoder();
+                    let decoder = config.builder().build_decoder();

Review Comment:
   That is certainly a lot nicer 😍 



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -168,6 +172,164 @@ pub enum FilePushdownSupport {
     Supported,
 }
 
+/// Possible outputs of a [`BatchDeserializer`].
+#[derive(Debug, PartialEq)]
+pub enum DeserializerOutput {
+    /// A successfully deserialized [`RecordBatch`].
+    RecordBatch(RecordBatch),
+    /// The deserializer requires more data to make progress.
+    RequiresMoreData,
+    /// The input data has been exhausted.
+    InputExhausted,
+}
+
+/// Trait defining a scheme for deserializing byte streams into structured 
data.
+/// Implementors of this trait are responsible for converting raw bytes into
+/// `RecordBatch` objects.
+pub trait BatchDeserializer<T>: Send + Debug {
+    /// Feeds a message for deserialization, updating the internal state of
+    /// this `BatchDeserializer`. Note that one can call this function multiple
+    /// times before calling `next`, which will queue multiple messages for
+    /// deserialization. Returns the number of bytes consumed.
+    fn digest(&mut self, message: T) -> usize;
+
+    /// Attempts to deserialize any pending messages and returns a
+    /// `DeserializerOutput` to indicate progress.
+    fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;
+
+    /// Informs the deserializer that no more messages will be provided for
+    /// deserialization.
+    fn finish(&mut self);
+}
+
+/// A general interface for decoders such as [`arrow::json::reader::Decoder`] 
and
+/// [`arrow::csv::reader::Decoder`]. Defines an interface similar to
+/// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes
+/// a method to check if the decoder can flush early. Intended to be used in
+/// conjunction with [`DecoderDeserializer`].
+///
+/// [`arrow::json::reader::Decoder`]: ::arrow::json::reader::Decoder
+/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
+/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
+/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
+pub(crate) trait Decoder: Send + Debug {
+    /// See [`arrow::json::reader::Decoder::decode`].
+    ///
+    /// [`arrow::json::reader::Decoder::decode`]: 
::arrow::json::reader::Decoder::decode

Review Comment:
   I double checked and 
https://docs.rs/arrow-json/53.2.0/arrow_json/reader/struct.Decoder.html seems 
to describe this interface well.
   
   



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -168,6 +172,164 @@ pub enum FilePushdownSupport {
     Supported,
 }
 
+/// Possible outputs of a [`BatchDeserializer`].
+#[derive(Debug, PartialEq)]
+pub enum DeserializerOutput {

Review Comment:
   👍  nice



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -168,6 +172,164 @@ pub enum FilePushdownSupport {
     Supported,
 }
 
+/// Possible outputs of a [`BatchDeserializer`].
+#[derive(Debug, PartialEq)]
+pub enum DeserializerOutput {
+    /// A successfully deserialized [`RecordBatch`].
+    RecordBatch(RecordBatch),
+    /// The deserializer requires more data to make progress.
+    RequiresMoreData,
+    /// The input data has been exhausted.
+    InputExhausted,
+}
+
+/// Trait defining a scheme for deserializing byte streams into structured 
data.
+/// Implementors of this trait are responsible for converting raw bytes into
+/// `RecordBatch` objects.
+pub trait BatchDeserializer<T>: Send + Debug {
+    /// Feeds a message for deserialization, updating the internal state of
+    /// this `BatchDeserializer`. Note that one can call this function multiple
+    /// times before calling `next`, which will queue multiple messages for
+    /// deserialization. Returns the number of bytes consumed.
+    fn digest(&mut self, message: T) -> usize;
+
+    /// Attempts to deserialize any pending messages and returns a
+    /// `DeserializerOutput` to indicate progress.
+    fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;
+
+    /// Informs the deserializer that no more messages will be provided for
+    /// deserialization.
+    fn finish(&mut self);
+}
+
+/// A general interface for decoders such as [`arrow::json::reader::Decoder`] 
and
+/// [`arrow::csv::reader::Decoder`]. Defines an interface similar to
+/// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes
+/// a method to check if the decoder can flush early. Intended to be used in
+/// conjunction with [`DecoderDeserializer`].
+///
+/// [`arrow::json::reader::Decoder`]: ::arrow::json::reader::Decoder
+/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
+/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
+/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
+pub(crate) trait Decoder: Send + Debug {
+    /// See [`arrow::json::reader::Decoder::decode`].
+    ///
+    /// [`arrow::json::reader::Decoder::decode`]: 
::arrow::json::reader::Decoder::decode
+    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
+
+    /// See [`arrow::json::reader::Decoder::flush`].
+    ///
+    /// [`arrow::json::reader::Decoder::flush`]: 
::arrow::json::reader::Decoder::flush
+    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
+
+    /// Whether the decoder can flush early in its current state.
+    fn can_flush_early(&self) -> bool;
+}
+
+impl<T: Decoder> Debug for DecoderDeserializer<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Deserializer")
+            .field("buffered_queue", &self.buffered_queue)
+            .field("finalized", &self.finalized)
+            .finish()
+    }
+}
+
+impl<T: Decoder> BatchDeserializer<Bytes> for DecoderDeserializer<T> {
+    fn digest(&mut self, message: Bytes) -> usize {
+        if message.is_empty() {
+            return 0;
+        }
+
+        let consumed = message.len();
+        self.buffered_queue.push_back(message);
+        consumed
+    }
+
+    fn next(&mut self) -> Result<DeserializerOutput, ArrowError> {
+        while let Some(buffered) = self.buffered_queue.front_mut() {
+            let decoded = self.decoder.decode(buffered)?;
+            buffered.advance(decoded);
+
+            if buffered.is_empty() {
+                self.buffered_queue.pop_front();
+            }
+
+            // Flush when the stream ends or batch size is reached
+            // Certain implementations can flush early
+            if decoded == 0 || self.decoder.can_flush_early() {
+                return match self.decoder.flush() {
+                    Ok(Some(batch)) => 
Ok(DeserializerOutput::RecordBatch(batch)),
+                    Ok(None) => continue,
+                    Err(e) => Err(e),
+                };
+            }
+        }
+        if self.finalized {
+            Ok(DeserializerOutput::InputExhausted)
+        } else {
+            Ok(DeserializerOutput::RequiresMoreData)
+        }
+    }
+
+    fn finish(&mut self) {
+        self.finalized = true;
+        // Ensure the decoder is flushed:
+        self.buffered_queue.push_back(Bytes::new());
+    }
+}
+
+/// A generic, decoder-based deserialization scheme for processing encoded 
data.
+///
+/// This struct is responsible for converting a stream of bytes, which 
represent
+/// encoded data, into a stream of `RecordBatch` objects, following the 
specified
+/// schema and formatting options.

Review Comment:
   it might be worth also mentioning here this handles any buffering on the 
input that might be required to fulfill the decode interface (that might return 
RecordBatches before fully consuming the input)
   
   It took me a while to figure out why this was required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to