jecsand838 commented on code in PR #7834: URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2192727836
########## arrow-avro/src/reader/mod.rs: ########## @@ -329,64 +334,41 @@ impl<R> Reader<R> { pub fn avro_header(&self) -> &Header { &self.header } -} -impl<R: BufRead> Reader<R> { /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> { - if self.finished { - return Ok(None); - } - loop { - if !self.block_data.is_empty() { - let consumed = self.decoder.decode(&self.block_data)?; - if consumed > 0 { - self.block_data.drain(..consumed); - } - match self.decoder.flush()? { - None => { - if !self.block_data.is_empty() { - break; - } - } - Some(batch) => { - return Ok(Some(batch)); - } - } - } - let maybe_block = { + 'outer: while !self.finished && !self.decoder.batch_is_full() { + while self.block_cursor == self.block_data.len() { let buf = self.reader.fill_buf()?; if buf.is_empty() { - None - } else { - let read_len = buf.len(); - let consumed_len = self.block_decoder.decode(buf)?; - self.reader.consume(consumed_len); - if consumed_len == 0 && read_len != 0 { - return Err(ArrowError::ParseError( - "Could not decode next Avro block from partial data".to_string(), - )); - } - self.block_decoder.flush() + self.finished = true; + break 'outer; } - }; - match maybe_block { - Some(block) => { - let block_data = if let Some(ref codec) = self.compression { + // Try to decode another block from the buffered reader. + let consumed = self.block_decoder.decode(buf)?; + self.reader.consume(consumed); + if let Some(block) = self.block_decoder.flush() { + // Successfully decoded a block. + let block_data = if let Some(ref codec) = self.header.compression()? { codec.decompress(&block.data)? } else { block.data }; self.block_data = block_data; + self.block_cursor = 0; + } else if consumed == 0 { + // The block decoder made no progress on a non-empty buffer. + return Err(ArrowError::ParseError( + "Could not decode next Avro block from partial data".to_string(), + )); } - None => { - self.finished = true; - if !self.block_data.is_empty() { - let consumed = self.decoder.decode(&self.block_data)?; - self.block_data.drain(..consumed); - } - return self.decoder.flush(); - } + } + // Try to decode more rows from the current block. + let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?; + if consumed == 0 && self.block_cursor < self.block_data.len() { + self.block_cursor = self.block_data.len(); Review Comment: > How does the reader determine that they have encountered such a record in the first place, if it occupies zero bytes? The reader "detects" a zero byte record not by reading bytes from the data stream, but by interpreting the Avro schema it was given. The schema would look something like this: `{"type": "record", "name": "EmptyRecord", "fields": []}`. > How does the reader infer the number of zero-byte records the writer intended to produce? It doesn't, and that's the core of the problem. The Avro binary format specifies that each data block begins with a count of the records it contains. However, the current Reader implementation does not use this record count to drive the decoding process (I plan on adding that logic soon). Instead, it decodes the raw byte payload of the block (`self.block_data`) until the buffer is empty. When it encounters the first zero byte record, it consumes 0 bytes and is unable to make progress. It has no way of knowing if the writer intended a zero byte record and if so how many, because there are no bytes in the stream to differentiate them. This ambiguity leads to the infinite loop that the code in question is trying to prevent. > How does one get "past" the record to read the (presumably normal) records that fill the rest of the block? You can't as this is currently implemented, which is precisely why I added that check as a short term fix. I'll need to add in logic like this: ```rust let record_count = block.num_records; // Read from block header for _ in 0..record_count { if self.decoder.batch_is_full() { // Stop if the Arrow batch is full, but remember our progress // in this Avro block for the next call to `read()`. break; } let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?; self.block_cursor += consumed; } ``` I just need to research the best way to go about this that solves all edge cases. I'm planning to follow-up on this work (before it's public) to both resolve this scenario and enable schema evolution. I just didn't want this PR to get too large and complicated is all, but I'm also happy to add the logic in here as well. Also @tustvold not sure what you're original vision was for this reader. I wanted to make sure I keep true to that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org