This is an automated email from the ASF dual-hosted git repository. nevime pushed a commit to branch ARROW-5182 in repository https://gitbox.apache.org/repos/asf/arrow.git
commit dc6171c6a621b29aaf8b9ee6cba7f39b34a9b352 Author: Neville Dipale <[email protected]> AuthorDate: Wed Dec 11 17:10:35 2019 +0200 ARROW-7193: [Rust] Arrow stream reader * Moved `arrow/ipc/file/reader` to `arrow/ipc/reader` * Renamed the file reader from `Reader` to `FileReader` * Added a `StreamReader` The stream reader currently terminates on a 0-byte read, but not on an EOS marker. We might need to work on the latter later, as I could not see the marker when inspecting the test files with a hex-editor. --- rust/arrow/src/ipc/file/mod.rs | 18 ---- rust/arrow/src/ipc/mod.rs | 2 +- rust/arrow/src/ipc/{file => }/reader.rs | 147 ++++++++++++++++++++++++++++++-- 3 files changed, 141 insertions(+), 26 deletions(-) diff --git a/rust/arrow/src/ipc/file/mod.rs b/rust/arrow/src/ipc/file/mod.rs deleted file mode 100644 index 4953590..0000000 --- a/rust/arrow/src/ipc/file/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod reader; diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index 41f8150..8c3bc08 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -16,7 +16,7 @@ // under the License. pub mod convert; -pub mod file; +pub mod reader; pub mod gen; diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/reader.rs similarity index 81% rename from rust/arrow/src/ipc/file/reader.rs rename to rust/arrow/src/ipc/reader.rs index e192964..78857a6 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -//! Arrow File Reader +//! Arrow IPC File and Stream Readers +//! +//! The `FileReader` and `StreamReader` have similar interfaces, +//! however the `FileReader` expects a reader that supports `Seek`ing use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; @@ -376,8 +379,8 @@ fn read_record_batch( } /// Arrow File reader -pub struct Reader<R: Read + Seek> { - /// Buffered reader that supports reading and seeking +pub struct FileReader<R: Read + Seek> { + /// Buffered file reader that supports reading and seeking reader: BufReader<R>, /// The schema that is read from the file header schema: Arc<Schema>, @@ -391,8 +394,8 @@ pub struct Reader<R: Read + Seek> { total_blocks: usize, } -impl<R: Read + Seek> Reader<R> { - /// Try to create a new reader +impl<R: Read + Seek> FileReader<R> { + /// Try to create a new file reader /// /// Returns errors if the file does not meet the Arrow Format header and footer /// requirements @@ -521,7 +524,106 @@ impl<R: Read + Seek> Reader<R> { } } -impl<R: Read + Seek> RecordBatchReader for Reader<R> { +impl<R: Read + Seek> RecordBatchReader for FileReader<R> { + fn schema(&mut self) -> SchemaRef { + self.schema.clone() + } + + fn next_batch(&mut self) -> Result<Option<RecordBatch>> { + self.next() + } +} + +/// Arrow Stream reader +pub struct StreamReader<R: Read> { + reader: BufReader<R>, + schema: Arc<Schema>, + finished: bool, +} + +impl<R: Read> StreamReader<R> { + /// Try to create a new stream reader + /// + /// The first message in the stream is the schema, the reader will fail if it does not + /// encounter a schema. + /// To check if the reader is done, use `is_finished(self)` + pub fn try_new(reader: R) -> Result<Self> { + let mut reader = BufReader::new(reader); + // determine metadata length + let mut meta_size: [u8; 4] = [0; 4]; + reader.read_exact(&mut meta_size)?; + let meta_len = u32::from_le_bytes(meta_size); + + let mut meta_buffer = vec![0; meta_len as usize]; + reader.read_exact(&mut meta_buffer)?; + + let vecs = &meta_buffer.to_vec(); + let message = ipc::get_root_as_message(vecs); + // message header is a Schema, so read it + let ipc_schema: ipc::Schema = message + .header_as_schema() + .expect("Arrow Stream must start with a schema"); + let schema = ipc::convert::fb_to_schema(ipc_schema); + + Ok(Self { + reader, + schema: Arc::new(schema), + finished: false, + }) + } + + /// Return the schema of the stream + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Read the next record batch + pub fn next(&mut self) -> Result<Option<RecordBatch>> { + if self.finished { + return Ok(None); + } + // determine metadata length + let mut meta_size: [u8; 4] = [0; 4]; + self.reader.read_exact(&mut meta_size)?; + let meta_len = u32::from_le_bytes(meta_size); + + if meta_len == 0 { + // the stream has ended, mark the reader as finished + self.finished = true; + return Ok(None); + } + + let mut meta_buffer = vec![0; meta_len as usize]; + self.reader.read_exact(&mut meta_buffer)?; + + let vecs = &meta_buffer.to_vec(); + let message = ipc::get_root_as_message(vecs); + + match message.header_type() { + ipc::MessageHeader::Schema => { + panic!("Not expecting a schema when messages are read") + } + ipc::MessageHeader::RecordBatch => { + let batch = message.header_as_record_batch().unwrap(); + // read the block that makes up the record batch into a buffer + let mut buf = vec![0; message.bodyLength() as usize]; + self.reader.read_exact(&mut buf)?; + + read_record_batch(&buf, batch, self.schema()) + } + _ => unimplemented!( + "reading types other than record batches not yet supported" + ), + } + } + + /// Check if the stream is finished + pub fn is_finished(&self) -> bool { + self.finished + } +} + +impl<R: Read> RecordBatchReader for StreamReader<R> { fn schema(&mut self) -> SchemaRef { self.schema.clone() } @@ -560,11 +662,42 @@ mod tests { )) .unwrap(); - let mut reader = Reader::try_new(file).unwrap(); + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_generated_streams() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc/integration/0.14.1/{}.stream", + testdata, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file).unwrap(); // read expected JSON output let arrow_json = read_gzip_json(path); assert!(arrow_json.equals_reader(&mut reader)); + // the next batch must be empty + assert!(reader.next().unwrap().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); }); }
