tustvold commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2192816955
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
pub fn avro_header(&self) -> &Header {
&self.header
}
-}
-impl<R: BufRead> Reader<R> {
/// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
- if self.finished {
- return Ok(None);
- }
- loop {
- if !self.block_data.is_empty() {
- let consumed = self.decoder.decode(&self.block_data)?;
- if consumed > 0 {
- self.block_data.drain(..consumed);
- }
- match self.decoder.flush()? {
- None => {
- if !self.block_data.is_empty() {
- break;
- }
- }
- Some(batch) => {
- return Ok(Some(batch));
- }
- }
- }
- let maybe_block = {
+ 'outer: while !self.finished && !self.decoder.batch_is_full() {
+ while self.block_cursor == self.block_data.len() {
let buf = self.reader.fill_buf()?;
if buf.is_empty() {
- None
- } else {
- let read_len = buf.len();
- let consumed_len = self.block_decoder.decode(buf)?;
- self.reader.consume(consumed_len);
- if consumed_len == 0 && read_len != 0 {
- return Err(ArrowError::ParseError(
- "Could not decode next Avro block from partial
data".to_string(),
- ));
- }
- self.block_decoder.flush()
+ self.finished = true;
+ break 'outer;
}
- };
- match maybe_block {
- Some(block) => {
- let block_data = if let Some(ref codec) = self.compression
{
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(buf)?;
+ self.reader.consume(consumed);
+ if let Some(block) = self.block_decoder.flush() {
+ // Successfully decoded a block.
+ let block_data = if let Some(ref codec) =
self.header.compression()? {
codec.decompress(&block.data)?
} else {
block.data
};
self.block_data = block_data;
+ self.block_cursor = 0;
+ } else if consumed == 0 {
+ // The block decoder made no progress on a non-empty
buffer.
+ return Err(ArrowError::ParseError(
+ "Could not decode next Avro block from partial
data".to_string(),
+ ));
}
- None => {
- self.finished = true;
- if !self.block_data.is_empty() {
- let consumed = self.decoder.decode(&self.block_data)?;
- self.block_data.drain(..consumed);
- }
- return self.decoder.flush();
- }
+ }
+ // Try to decode more rows from the current block.
+ let consumed =
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+ if consumed == 0 && self.block_cursor < self.block_data.len() {
+ self.block_cursor = self.block_data.len();
Review Comment:
I'm afraid I don't have the capacity to review at the moment, or
realistically for the foreseeable future, however, the major goal of the
structure of the reader was to make it possible to read in a stream oriented
fashion - a la the CSV or JSON decoders. This makes it possible to use with
undelimited data streams from the likes of object_store.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]