alamb commented on code in PR #5531:
URL: https://github.com/apache/arrow-rs/pull/5531#discussion_r1535891144
##########
arrow-ipc/src/convert.rs:
##########
@@ -806,6 +811,45 @@ pub(crate) fn get_fb_dictionary<'a>(
builder.finish()
}
+/// An owned container for a validated [`Message`]
+///
+/// Safely decoding a flatbuffer requires validating the various embedded
offsets,
+/// see [`Verifier`]. This is a potentially expensive operation, and it is
therefore desirable
+/// to only do this once. [`crate::root_as_message`] performs this validation
on construction,
+/// however, it returns a [`Message`] borrowing the provided byte slice. This
prevents
+/// storing this [`Message`] in the same data structure that owns the buffer,
as this
+/// would require self-referential borrows.
+///
+/// [`MessageBuffer`] solves this problem by providing a safe API for a
[`Message`]
+/// without a lifetime bound.
+#[derive(Clone)]
+pub struct MessageBuffer(Buffer);
+
+impl Debug for MessageBuffer {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ self.as_ref().fmt(f)
+ }
+}
+
+impl MessageBuffer {
+ /// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
+ pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
+ let opts = VerifierOptions::default();
+ let mut v = Verifier::new(&opts, &buf);
+ <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
+ ArrowError::ParseError(format!("Unable to get root as message:
{err:?}"))
+ })?;
+ Ok(Self(buf))
+ }
+
+ /// Return the [`Message`]
+ #[inline]
+ pub fn as_ref(&self) -> Message<'_> {
+ // SAFETY: Run verifier on construction
Review Comment:
I was worried that this is a not a safe API as since this is a public struct
and it could be created directly
```rust
let buf = MessageBuffer(my_unsafe_buffer);
// cast to message without checks being called:
let my unsafe_message = buf.as_ref();
```
However, I convinced myself it was actually safe as long as `MessageBuffer`
is used from some other module:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=efb81e5fe993e75995b09e8773d31643
##########
arrow-integration-testing/tests/ipc_reader.rs:
##########
@@ -182,18 +184,45 @@ fn verify_arrow_stream(testdata: &str, version: &str,
path: &str) {
let filename =
format!("{testdata}/arrow-ipc-stream/integration/{version}/{path}.stream");
println!("Verifying {filename}");
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+
// Compare contents to the expected output format in JSON
{
println!(" verifying content");
let file = File::open(&filename).unwrap();
let mut reader = StreamReader::try_new(file, None).unwrap();
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
}
+
+ // Test stream decoder
+ let expected = arrow_json.get_record_batches().unwrap();
Review Comment:
✅ this is a very nice test
##########
arrow-ipc/src/reader/stream.rs:
##########
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_schema::{ArrowError, SchemaRef};
+
+use crate::convert::MessageBuffer;
+use crate::reader::{read_dictionary, read_record_batch};
+use crate::{MessageHeader, CONTINUATION_MARKER};
+
+/// A low-level interface for reading [`RecordBatch`] data from a stream of
bytes
+///
+/// See [StreamReader](crate::reader::StreamReader) for a higher-level
interface
+#[derive(Debug, Default)]
+pub struct StreamDecoder {
+ /// The schema of this decoder, if read
+ schema: Option<SchemaRef>,
+ /// Lookup table for dictionaries by ID
+ dictionaries: HashMap<i64, ArrayRef>,
+ /// The decoder state
+ state: DecoderState,
+ /// A scratch buffer when a read is split across multiple `Buffer`
+ buf: MutableBuffer,
+}
+
+#[derive(Debug)]
+enum DecoderState {
+ /// Decoding the message header
+ Header {
+ /// Temporary buffer
+ buf: [u8; 4],
+ /// Number of bytes read into buf
+ read: u8,
+ /// If we have read a continuation token
+ continuation: bool,
+ },
+ /// Decoding the message flatbuffer
+ Message {
+ /// The size of the message flatbuffer
+ size: u32,
+ },
+ /// Decoding the message body
+ Body {
+ /// The message flatbuffer
+ message: MessageBuffer,
+ },
+ /// Reached the end of the stream
+ Finished,
+}
+
+impl Default for DecoderState {
+ fn default() -> Self {
+ Self::Header {
+ buf: [0; 4],
+ read: 0,
+ continuation: false,
+ }
+ }
+}
+
+impl StreamDecoder {
+ /// Create a new [`StreamDecoder`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Try to read the next [`RecordBatch`] from the provided [`Buffer`]
+ ///
+ /// [`Buffer::advance`] will be called on `buffer` for any consumed bytes.
+ ///
+ /// The push-based interface facilitates integration with sources that
yield arbitrarily
+ /// delimited bytes ranges, such as a chunked byte stream received from
object storage
+ ///
+ /// ```
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_buffer::Buffer;
+ /// # use arrow_ipc::reader::StreamDecoder;
+ /// # use arrow_schema::ArrowError;
+ /// #
+ /// fn print_stream<I>(src: impl Iterator<Item = Buffer>) -> Result<(),
ArrowError> {
+ /// let mut decoder = StreamDecoder::new();
+ /// for mut x in src {
+ /// while !x.is_empty() {
+ /// if let Some(x) = decoder.decode(&mut x)? {
+ /// println!("{x:?}");
+ /// }
+ /// }
+ /// }
+ /// decoder.finish().unwrap();
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn decode(&mut self, buffer: &mut Buffer) ->
Result<Option<RecordBatch>, ArrowError> {
Review Comment:
I double checked `decode` is consistent with the names used by the csv reader
##########
arrow-ipc/src/convert.rs:
##########
@@ -806,6 +811,45 @@ pub(crate) fn get_fb_dictionary<'a>(
builder.finish()
}
+/// An owned container for a validated [`Message`]
+///
+/// Safely decoding a flatbuffer requires validating the various embedded
offsets,
+/// see [`Verifier`]. This is a potentially expensive operation, and it is
therefore desirable
+/// to only do this once. [`crate::root_as_message`] performs this validation
on construction,
+/// however, it returns a [`Message`] borrowing the provided byte slice. This
prevents
+/// storing this [`Message`] in the same data structure that owns the buffer,
as this
+/// would require self-referential borrows.
+///
+/// [`MessageBuffer`] solves this problem by providing a safe API for a
[`Message`]
+/// without a lifetime bound.
+#[derive(Clone)]
+pub struct MessageBuffer(Buffer);
+
+impl Debug for MessageBuffer {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ self.as_ref().fmt(f)
+ }
+}
+
+impl MessageBuffer {
+ /// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
+ pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
+ let opts = VerifierOptions::default();
+ let mut v = Verifier::new(&opts, &buf);
+ <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
+ ArrowError::ParseError(format!("Unable to get root as message:
{err:?}"))
+ })?;
+ Ok(Self(buf))
+ }
+
+ /// Return the [`Message`]
+ #[inline]
+ pub fn as_ref(&self) -> Message<'_> {
Review Comment:
I think it would be more standard to call this function `as_message` and
then provide an `impl AsRef<Message> for MessageBuffer` and that way the
compiler can automatically do the deref.
##########
arrow-integration-testing/tests/ipc_reader.rs:
##########
@@ -182,18 +184,45 @@ fn verify_arrow_stream(testdata: &str, version: &str,
path: &str) {
let filename =
format!("{testdata}/arrow-ipc-stream/integration/{version}/{path}.stream");
println!("Verifying {filename}");
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+
// Compare contents to the expected output format in JSON
{
println!(" verifying content");
let file = File::open(&filename).unwrap();
let mut reader = StreamReader::try_new(file, None).unwrap();
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
}
+
+ // Test stream decoder
+ let expected = arrow_json.get_record_batches().unwrap();
+ for chunk_sizes in [1, 2, 8, 123] {
+ let mut decoder = StreamDecoder::new();
+ let stream = chunked_file(&filename, chunk_sizes);
+ let mut actual = Vec::with_capacity(expected.len());
+ for mut x in stream {
+ while !x.is_empty() {
+ if let Some(x) = decoder.decode(&mut x).unwrap() {
+ actual.push(x);
+ }
+ }
+ }
+ decoder.finish().unwrap();
+ assert_eq!(expected, actual);
Review Comment:
Can we also add tests for error conditions?
At the very least I think we should have a test for truncated stream which I
think would be the most likely error in practice.
It would be nice to have a test for things like missing dictionaries, but I
think that the value of those tests is lower
##########
arrow-ipc/src/reader/stream.rs:
##########
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_schema::{ArrowError, SchemaRef};
+
+use crate::convert::MessageBuffer;
+use crate::reader::{read_dictionary, read_record_batch};
+use crate::{MessageHeader, CONTINUATION_MARKER};
+
+/// A low-level interface for reading [`RecordBatch`] data from a stream of
bytes
+///
+/// See [StreamReader](crate::reader::StreamReader) for a higher-level
interface
+#[derive(Debug, Default)]
+pub struct StreamDecoder {
+ /// The schema of this decoder, if read
+ schema: Option<SchemaRef>,
+ /// Lookup table for dictionaries by ID
+ dictionaries: HashMap<i64, ArrayRef>,
+ /// The decoder state
+ state: DecoderState,
+ /// A scratch buffer when a read is split across multiple `Buffer`
+ buf: MutableBuffer,
+}
+
+#[derive(Debug)]
+enum DecoderState {
+ /// Decoding the message header
+ Header {
+ /// Temporary buffer
+ buf: [u8; 4],
+ /// Number of bytes read into buf
+ read: u8,
+ /// If we have read a continuation token
+ continuation: bool,
+ },
+ /// Decoding the message flatbuffer
+ Message {
+ /// The size of the message flatbuffer
+ size: u32,
+ },
+ /// Decoding the message body
+ Body {
+ /// The message flatbuffer
+ message: MessageBuffer,
+ },
+ /// Reached the end of the stream
+ Finished,
+}
+
+impl Default for DecoderState {
+ fn default() -> Self {
+ Self::Header {
+ buf: [0; 4],
+ read: 0,
+ continuation: false,
+ }
+ }
+}
+
+impl StreamDecoder {
+ /// Create a new [`StreamDecoder`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Try to read the next [`RecordBatch`] from the provided [`Buffer`]
+ ///
+ /// [`Buffer::advance`] will be called on `buffer` for any consumed bytes.
+ ///
+ /// The push-based interface facilitates integration with sources that
yield arbitrarily
+ /// delimited bytes ranges, such as a chunked byte stream received from
object storage
+ ///
+ /// ```
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_buffer::Buffer;
+ /// # use arrow_ipc::reader::StreamDecoder;
+ /// # use arrow_schema::ArrowError;
+ /// #
+ /// fn print_stream<I>(src: impl Iterator<Item = Buffer>) -> Result<(),
ArrowError> {
+ /// let mut decoder = StreamDecoder::new();
+ /// for mut x in src {
+ /// while !x.is_empty() {
+ /// if let Some(x) = decoder.decode(&mut x)? {
+ /// println!("{x:?}");
+ /// }
+ /// }
+ /// }
+ /// decoder.finish().unwrap();
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn decode(&mut self, buffer: &mut Buffer) ->
Result<Option<RecordBatch>, ArrowError> {
+ while !buffer.is_empty() {
+ match &mut self.state {
+ DecoderState::Header {
+ buf,
+ read,
+ continuation,
+ } => {
+ let offset_buf = &mut buf[*read as usize..];
+ let to_read = buffer.len().min(offset_buf.len());
+ offset_buf[..to_read].copy_from_slice(&buffer[..to_read]);
+ *read += to_read as u8;
+ buffer.advance(to_read);
+ if *read == 4 {
+ if !*continuation && buf == &CONTINUATION_MARKER {
+ *continuation = true;
+ *read = 0;
+ continue;
+ }
+ let size = u32::from_le_bytes(*buf);
+
+ if size == 0 {
+ self.state = DecoderState::Finished;
+ continue;
+ }
+ self.state = DecoderState::Message { size };
+ }
+ }
+ DecoderState::Message { size } => {
+ let len = *size as usize;
+ if self.buf.is_empty() && buffer.len() > len {
+ let message =
MessageBuffer::try_new(buffer.slice_with_length(0, len))?;
+ self.state = DecoderState::Body { message };
+ buffer.advance(len);
+ continue;
+ }
+
+ let to_read = buffer.len().min(len - self.buf.len());
+ self.buf.extend_from_slice(&buffer[..to_read]);
+ buffer.advance(to_read);
+ if self.buf.len() == len {
+ let message =
MessageBuffer::try_new(std::mem::take(&mut self.buf).into())?;
+ self.state = DecoderState::Body { message };
+ }
+ }
+ DecoderState::Body { message } => {
+ let message = message.as_ref();
+ let body_length = message.bodyLength() as usize;
+
+ let body = if self.buf.is_empty() && buffer.len() >=
body_length {
+ let body = buffer.slice_with_length(0, body_length);
+ buffer.advance(body_length);
+ body
+ } else {
+ let to_read = buffer.len().min(body_length -
self.buf.len());
+ self.buf.extend_from_slice(&buffer[..to_read]);
+ buffer.advance(to_read);
+
+ if self.buf.len() != body_length {
+ continue;
+ }
+ std::mem::take(&mut self.buf).into()
+ };
+
+ let version = message.version();
+ match message.header_type() {
+ MessageHeader::Schema => {
+ if self.schema.is_some() {
+ return Err(ArrowError::IpcError(
+ "Not expecting a schema when messages are
read".to_string(),
+ ));
+ }
+
+ let ipc_schema =
message.header_as_schema().unwrap();
+ let schema =
crate::convert::fb_to_schema(ipc_schema);
+ self.state = DecoderState::default();
+ self.schema = Some(Arc::new(schema));
+ }
+ MessageHeader::RecordBatch => {
+ let batch =
message.header_as_record_batch().unwrap();
+ let schema = self.schema.clone().ok_or_else(|| {
+ ArrowError::IpcError("Missing
schema".to_string())
+ })?;
+ let batch = read_record_batch(
+ &body,
+ batch,
+ schema,
+ &self.dictionaries,
+ None,
+ &version,
+ )?;
+ self.state = DecoderState::default();
+ return Ok(Some(batch));
+ }
+ MessageHeader::DictionaryBatch => {
+ let dictionary =
message.header_as_dictionary_batch().unwrap();
+ let schema = self.schema.as_deref().ok_or_else(|| {
+ ArrowError::IpcError("Missing
schema".to_string())
+ })?;
+ read_dictionary(
+ &body,
+ dictionary,
+ schema,
+ &mut self.dictionaries,
+ &version,
+ )?;
+ self.state = DecoderState::default();
+ }
+ MessageHeader::NONE => {
+ self.state = DecoderState::default();
+ }
+ t => {
+ return Err(ArrowError::IpcError(format!(
+ "Message type unsupported by StreamDecoder:
{t:?}"
+ )))
+ }
+ }
+ }
+ DecoderState::Finished => {
+ return Err(ArrowError::IpcError("Unexpected
EOS".to_string()))
+ }
+ }
+ }
+ Ok(None)
+ }
+
+ /// Signal the end of stream
+ ///
+ /// Returns an error if this would truncate a message
Review Comment:
The "truncate a message' is confusing to me as this code isn't creating
messages and therefore couldn't truncate them. Maybe this would be clearer:
```suggestion
/// Returns an error if any partial data remains in the stream
```
##########
arrow-ipc/src/reader/stream.rs:
##########
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_schema::{ArrowError, SchemaRef};
+
+use crate::convert::MessageBuffer;
+use crate::reader::{read_dictionary, read_record_batch};
+use crate::{MessageHeader, CONTINUATION_MARKER};
+
+/// A low-level interface for reading [`RecordBatch`] data from a stream of
bytes
+///
+/// See [StreamReader](crate::reader::StreamReader) for a higher-level
interface
+#[derive(Debug, Default)]
+pub struct StreamDecoder {
+ /// The schema of this decoder, if read
+ schema: Option<SchemaRef>,
+ /// Lookup table for dictionaries by ID
+ dictionaries: HashMap<i64, ArrayRef>,
+ /// The decoder state
+ state: DecoderState,
+ /// A scratch buffer when a read is split across multiple `Buffer`
+ buf: MutableBuffer,
+}
+
+#[derive(Debug)]
+enum DecoderState {
+ /// Decoding the message header
+ Header {
+ /// Temporary buffer
+ buf: [u8; 4],
+ /// Number of bytes read into buf
+ read: u8,
+ /// If we have read a continuation token
+ continuation: bool,
+ },
+ /// Decoding the message flatbuffer
+ Message {
+ /// The size of the message flatbuffer
+ size: u32,
+ },
+ /// Decoding the message body
+ Body {
+ /// The message flatbuffer
+ message: MessageBuffer,
+ },
+ /// Reached the end of the stream
+ Finished,
+}
+
+impl Default for DecoderState {
+ fn default() -> Self {
+ Self::Header {
+ buf: [0; 4],
+ read: 0,
+ continuation: false,
+ }
+ }
+}
+
+impl StreamDecoder {
+ /// Create a new [`StreamDecoder`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Try to read the next [`RecordBatch`] from the provided [`Buffer`]
+ ///
+ /// [`Buffer::advance`] will be called on `buffer` for any consumed bytes.
+ ///
+ /// The push-based interface facilitates integration with sources that
yield arbitrarily
+ /// delimited bytes ranges, such as a chunked byte stream received from
object storage
+ ///
+ /// ```
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_buffer::Buffer;
+ /// # use arrow_ipc::reader::StreamDecoder;
+ /// # use arrow_schema::ArrowError;
+ /// #
+ /// fn print_stream<I>(src: impl Iterator<Item = Buffer>) -> Result<(),
ArrowError> {
+ /// let mut decoder = StreamDecoder::new();
+ /// for mut x in src {
+ /// while !x.is_empty() {
+ /// if let Some(x) = decoder.decode(&mut x)? {
+ /// println!("{x:?}");
+ /// }
+ /// }
+ /// }
+ /// decoder.finish().unwrap();
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn decode(&mut self, buffer: &mut Buffer) ->
Result<Option<RecordBatch>, ArrowError> {
+ while !buffer.is_empty() {
+ match &mut self.state {
+ DecoderState::Header {
+ buf,
+ read,
+ continuation,
+ } => {
+ let offset_buf = &mut buf[*read as usize..];
+ let to_read = buffer.len().min(offset_buf.len());
+ offset_buf[..to_read].copy_from_slice(&buffer[..to_read]);
+ *read += to_read as u8;
+ buffer.advance(to_read);
+ if *read == 4 {
+ if !*continuation && buf == &CONTINUATION_MARKER {
+ *continuation = true;
+ *read = 0;
+ continue;
+ }
+ let size = u32::from_le_bytes(*buf);
+
+ if size == 0 {
+ self.state = DecoderState::Finished;
+ continue;
+ }
+ self.state = DecoderState::Message { size };
+ }
+ }
+ DecoderState::Message { size } => {
+ let len = *size as usize;
+ if self.buf.is_empty() && buffer.len() > len {
+ let message =
MessageBuffer::try_new(buffer.slice_with_length(0, len))?;
+ self.state = DecoderState::Body { message };
+ buffer.advance(len);
+ continue;
+ }
+
+ let to_read = buffer.len().min(len - self.buf.len());
+ self.buf.extend_from_slice(&buffer[..to_read]);
+ buffer.advance(to_read);
+ if self.buf.len() == len {
+ let message =
MessageBuffer::try_new(std::mem::take(&mut self.buf).into())?;
+ self.state = DecoderState::Body { message };
+ }
+ }
+ DecoderState::Body { message } => {
+ let message = message.as_ref();
+ let body_length = message.bodyLength() as usize;
+
+ let body = if self.buf.is_empty() && buffer.len() >=
body_length {
+ let body = buffer.slice_with_length(0, body_length);
+ buffer.advance(body_length);
+ body
+ } else {
+ let to_read = buffer.len().min(body_length -
self.buf.len());
+ self.buf.extend_from_slice(&buffer[..to_read]);
+ buffer.advance(to_read);
+
+ if self.buf.len() != body_length {
+ continue;
+ }
+ std::mem::take(&mut self.buf).into()
+ };
+
+ let version = message.version();
+ match message.header_type() {
+ MessageHeader::Schema => {
+ if self.schema.is_some() {
+ return Err(ArrowError::IpcError(
+ "Not expecting a schema when messages are
read".to_string(),
+ ));
+ }
+
+ let ipc_schema =
message.header_as_schema().unwrap();
+ let schema =
crate::convert::fb_to_schema(ipc_schema);
+ self.state = DecoderState::default();
+ self.schema = Some(Arc::new(schema));
+ }
+ MessageHeader::RecordBatch => {
+ let batch =
message.header_as_record_batch().unwrap();
+ let schema = self.schema.clone().ok_or_else(|| {
+ ArrowError::IpcError("Missing
schema".to_string())
+ })?;
+ let batch = read_record_batch(
+ &body,
+ batch,
+ schema,
+ &self.dictionaries,
+ None,
+ &version,
+ )?;
+ self.state = DecoderState::default();
+ return Ok(Some(batch));
+ }
+ MessageHeader::DictionaryBatch => {
+ let dictionary =
message.header_as_dictionary_batch().unwrap();
+ let schema = self.schema.as_deref().ok_or_else(|| {
+ ArrowError::IpcError("Missing
schema".to_string())
+ })?;
+ read_dictionary(
+ &body,
+ dictionary,
+ schema,
+ &mut self.dictionaries,
+ &version,
+ )?;
+ self.state = DecoderState::default();
+ }
+ MessageHeader::NONE => {
+ self.state = DecoderState::default();
+ }
+ t => {
+ return Err(ArrowError::IpcError(format!(
+ "Message type unsupported by StreamDecoder:
{t:?}"
+ )))
+ }
+ }
+ }
+ DecoderState::Finished => {
+ return Err(ArrowError::IpcError("Unexpected
EOS".to_string()))
+ }
+ }
+ }
+ Ok(None)
+ }
+
+ /// Signal the end of stream
+ ///
+ /// Returns an error if this would truncate a message
+ pub fn finish(&mut self) -> Result<(), ArrowError> {
+ match self.state {
+ DecoderState::Finished
+ | DecoderState::Header {
+ read: 0,
+ continuation: false,
+ ..
+ } => Ok(()),
+ _ => Err(ArrowError::IpcError("Unexpected EOS".to_string())),
Review Comment:
In this context I know what EOS is but I have not encountered that term all
that often so I think we should be more verbose here
```suggestion
_ => Err(ArrowError::IpcError("Unexpected End of
Stream".to_string())),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]