scovich commented on code in PR #8100:
URL: https://github.com/apache/arrow-rs/pull/8100#discussion_r2266533676


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -173,36 +173,29 @@ impl Decoder {
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
         let mut total_consumed = 0usize;
         while total_consumed < data.len() && self.remaining_capacity > 0 {
-            if !self.awaiting_body {
-                if let Some(n) = self.handle_prefix(&data[total_consumed..])? {
-                    if n == 0 {
-                        break;
-                    }
-                    total_consumed += n;
-                    self.awaiting_body = true;
-                    self.apply_pending_schema_if_batch_empty();
-                    if self.remaining_capacity == 0 {
-                        break;
+            if self.awaiting_body {
+                match self.active_decoder.decode(&data[total_consumed..], 1) {
+                    Ok(n) => {
+                        self.remaining_capacity -= 1;
+                        total_consumed += n;
+                        self.awaiting_body = false;
+                        continue;
                     }
-                }
+                    Err(ref e) if is_incomplete_data(e) => return 
Ok(total_consumed),

Review Comment:
   ```suggestion
                       Err(ref e) if is_incomplete_data(e) => break,
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -130,6 +129,16 @@ fn read_header<R: BufRead>(mut reader: R) -> 
Result<Header, ArrowError> {
     })
 }
 
+fn is_incomplete_data(err: &ArrowError) -> bool {
+    matches!(
+        err,
+        ArrowError::ParseError(msg)
+            if msg.contains("Unexpected EOF")
+            || msg.contains("bad varint")
+            || msg.contains("offset overflow")

Review Comment:
   Hmm, after thinking more about this over the weekend -- 
   
   Trying to interpret/suppress these errors will almost certainly make the 
decoder brittle in the face of malformed input bytes that legitimately trigger 
these errors. For example, we could put the decoder in an infinite loop where 
it keeps trying to fetch more and more bytes in hopes of eliminating the error, 
when the error is fully contained in the existing buffer.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -270,6 +271,24 @@ impl Decoder {
                 self.active_decoder = new_decoder;
             }
         }
+    }
+
+    fn apply_pending_schema_if_batch_empty(&mut self) {
+        if self.remaining_capacity != self.batch_size {
+            return;
+        }
+        self.apply_pending_schema();
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        self.apply_pending_schema();

Review Comment:
   `flush` and `flush_block` are identical except this call to 
`self.apply_pending_schema`? 
   Is there a way to deduplicate the code? Maybe a `flush_internal` that takes 
a boolean argument (which the compiler would aggressively inline away as if it 
were a generic parameter)? 
   
   Or just call `self.apply_pending_schema` unconditionally, knowing it should 
be a no-op during block decoding because `self.pending_schema` is always None?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -270,6 +271,24 @@ impl Decoder {
                 self.active_decoder = new_decoder;
             }
         }
+    }
+
+    fn apply_pending_schema_if_batch_empty(&mut self) {
+        if self.remaining_capacity != self.batch_size {
+            return;
+        }
+        self.apply_pending_schema();

Review Comment:
   Also, we have quite a few places that could benefit from a small helper 
method:
   ```rust
   fn batch_is_empty(&self) -> bool {
       self.remaining_capacity == self.batch_size
   }
   ```
   ... which could improve readability, e.g. 
   ```suggestion
           if self.batch_is_empty() {
               self.apply_pending_schema();
           }
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -270,6 +271,24 @@ impl Decoder {
                 self.active_decoder = new_decoder;
             }
         }
+    }
+
+    fn apply_pending_schema_if_batch_empty(&mut self) {
+        if self.remaining_capacity != self.batch_size {
+            return;
+        }
+        self.apply_pending_schema();

Review Comment:
   ```suggestion
           if self.remaining_capacity == self.batch_size {
               self.apply_pending_schema();
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to