jecsand838 commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2183842537


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
     pub fn avro_header(&self) -> &Header {
         &self.header
     }
-}
 
-impl<R: BufRead> Reader<R> {
     /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.finished {
-            return Ok(None);
-        }
-        loop {
-            if !self.block_data.is_empty() {
-                let consumed = self.decoder.decode(&self.block_data)?;
-                if consumed > 0 {
-                    self.block_data.drain(..consumed);
-                }
-                match self.decoder.flush()? {
-                    None => {
-                        if !self.block_data.is_empty() {
-                            break;
-                        }
-                    }
-                    Some(batch) => {
-                        return Ok(Some(batch));
-                    }
-                }
-            }
-            let maybe_block = {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
                 let buf = self.reader.fill_buf()?;
                 if buf.is_empty() {
-                    None
-                } else {
-                    let read_len = buf.len();
-                    let consumed_len = self.block_decoder.decode(buf)?;
-                    self.reader.consume(consumed_len);
-                    if consumed_len == 0 && read_len != 0 {
-                        return Err(ArrowError::ParseError(
-                            "Could not decode next Avro block from partial 
data".to_string(),
-                        ));
-                    }
-                    self.block_decoder.flush()
+                    self.finished = true;
+                    break 'outer;
                 }
-            };
-            match maybe_block {
-                Some(block) => {
-                    let block_data = if let Some(ref codec) = self.compression 
{
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
                         codec.decompress(&block.data)?
                     } else {
                         block.data
                     };
                     self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
                 }
-                None => {
-                    self.finished = true;
-                    if !self.block_data.is_empty() {
-                        let consumed = self.decoder.decode(&self.block_data)?;
-                        self.block_data.drain(..consumed);
-                    }
-                    return self.decoder.flush();
-                }
+            }
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();

Review Comment:
   The core of the issue, and the reason for this code, lies in handling a 
specific edge case allowed by the Avro specification: records that have a 
zero-byte encoding. This can happen, for instance, with a record that has no 
fields or contains only fields of the null type.
   
   There is a significant drawback to the current approach: if a data block 
contains a mix of zero-sized and non-zero-sized records, this logic would cause 
any records that appear after the first zero-sized one to be skipped, leading 
to data loss. It's a trade-off to prevent a hang at the cost of potential data 
loss in this specific and rare edge case. I planned to develop it out further 
in a follow-up PR before this is publicly exposed via a new `records_read` 
counter based approach. If needed though I can add it to this PR as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to