jecsand838 commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2192727836


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
     pub fn avro_header(&self) -> &Header {
         &self.header
     }
-}
 
-impl<R: BufRead> Reader<R> {
     /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.finished {
-            return Ok(None);
-        }
-        loop {
-            if !self.block_data.is_empty() {
-                let consumed = self.decoder.decode(&self.block_data)?;
-                if consumed > 0 {
-                    self.block_data.drain(..consumed);
-                }
-                match self.decoder.flush()? {
-                    None => {
-                        if !self.block_data.is_empty() {
-                            break;
-                        }
-                    }
-                    Some(batch) => {
-                        return Ok(Some(batch));
-                    }
-                }
-            }
-            let maybe_block = {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
                 let buf = self.reader.fill_buf()?;
                 if buf.is_empty() {
-                    None
-                } else {
-                    let read_len = buf.len();
-                    let consumed_len = self.block_decoder.decode(buf)?;
-                    self.reader.consume(consumed_len);
-                    if consumed_len == 0 && read_len != 0 {
-                        return Err(ArrowError::ParseError(
-                            "Could not decode next Avro block from partial 
data".to_string(),
-                        ));
-                    }
-                    self.block_decoder.flush()
+                    self.finished = true;
+                    break 'outer;
                 }
-            };
-            match maybe_block {
-                Some(block) => {
-                    let block_data = if let Some(ref codec) = self.compression 
{
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
                         codec.decompress(&block.data)?
                     } else {
                         block.data
                     };
                     self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
                 }
-                None => {
-                    self.finished = true;
-                    if !self.block_data.is_empty() {
-                        let consumed = self.decoder.decode(&self.block_data)?;
-                        self.block_data.drain(..consumed);
-                    }
-                    return self.decoder.flush();
-                }
+            }
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();

Review Comment:
   > How does the reader determine that they have encountered such a record in 
the first place, if it occupies zero bytes?
   
   The reader "detects" a zero byte record not by reading bytes from the data 
stream, but by interpreting the Avro schema it was given. The schema would look 
something like this: `{"type": "record", "name": "EmptyRecord", "fields": []}`.
   
   > How does the reader infer the number of zero-byte records the writer 
intended to produce?
   
   It doesn't, and that's the core of the problem. The Avro binary format 
specifies that each data block begins with a count of the records it contains. 
However, the current Reader implementation does not use this record count to 
drive the decoding process (I plan on adding that logic soon). Instead, it 
decodes the raw byte payload of the block (`self.block_data`) until the buffer 
is empty. When it encounters the first zero byte record, it consumes 0 bytes 
and is unable to make progress. It has no way of knowing if the writer intended 
a zero byte record and if so how many, because there are no bytes in the stream 
to differentiate them. This ambiguity leads to the infinite loop that the code 
in question is trying to prevent.
   
   > How does one get "past" the record to read the (presumably normal) records 
that fill the rest of the block?
   
   You can't as this is currently implemented, which is precisely why I added 
that check as a short term fix. I'll need to add in logic like this:
   
   ```rust
   let record_count = block.num_records; // Read from block header
   for _ in 0..record_count {
       if self.decoder.batch_is_full() {
           // Stop if the Arrow batch is full, but remember our progress
           // in this Avro block for the next call to `read()`.
           break;
       }
       
       let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
       self.block_cursor += consumed;
   }
   ```
   
   I just need to research the best way to go about this that solves all edge 
cases. I'm planning to follow-up on this work (before it's public) to both 
resolve this scenario and enable schema evolution. I just didn't want this PR 
to get too large and complicated is all, but I'm also happy to add the logic in 
here as well.
   
   Also @tustvold not sure what you're original vision was for this reader. I 
wanted to make sure I keep true to that as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to