This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 49041eb  chore: parse_ipc_message should return exception instead of 
return None if parse failed (#97)
49041eb is described below

commit 49041eb873830e11658e1993602c673a5e638a13
Author: Keith Lee <[email protected]>
AuthorDate: Sat Dec 20 03:37:01 2025 +0000

    chore: parse_ipc_message should return exception instead of return None if 
parse failed (#97)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 crates/fluss/src/error.rs        |   2 +-
 crates/fluss/src/record/arrow.rs | 112 +++++++++++++++++++++++++++++----------
 2 files changed, 84 insertions(+), 30 deletions(-)

diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index b1d5d13..63438b1 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -39,7 +39,7 @@ pub enum Error {
     #[error("Row convert error")]
     RowConvertError(String),
 
-    #[error("arrow error")]
+    #[error("Arrow error: {0}")]
     ArrowError(#[from] ArrowError),
 
     #[error("Write error: {0}")]
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index f079f09..6e8cb55 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -34,6 +34,7 @@ use arrow::{
         writer::StreamWriter,
     },
 };
+use arrow_schema::ArrowError::ParseError;
 use arrow_schema::SchemaRef;
 use arrow_schema::{DataType as ArrowDataType, Field};
 use byteorder::WriteBytesExt;
@@ -489,19 +490,15 @@ impl<'a> LogRecordBatch<'a> {
         let data = &self.data[RECORDS_OFFSET..];
 
         let record_batch = read_context.record_batch(data)?;
-        let log_record_iterator = match record_batch {
-            None => LogRecordIterator::empty(),
-            Some(record_batch) => {
-                let arrow_reader = ArrowReader::new(Arc::new(record_batch));
-                LogRecordIterator::Arrow(ArrowLogRecordIterator {
-                    reader: arrow_reader,
-                    base_offset: self.base_log_offset(),
-                    timestamp: self.commit_timestamp(),
-                    row_id: 0,
-                    change_type: ChangeType::AppendOnly,
-                })
-            }
-        };
+        let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+        let log_record_iterator = 
LogRecordIterator::Arrow(ArrowLogRecordIterator {
+            reader: arrow_reader,
+            base_offset: self.base_log_offset(),
+            timestamp: self.commit_timestamp(),
+            row_id: 0,
+            change_type: ChangeType::AppendOnly,
+        });
+
         Ok(log_record_iterator)
     }
 
@@ -542,15 +539,16 @@ impl<'a> LogRecordBatch<'a> {
 /// * `data` - The byte slice containing the IPC message.
 ///
 /// # Returns
-/// Returns `Some((batch_metadata, body_buffer, version))` on success:
+/// Returns `Ok((batch_metadata, body_buffer, version))` on success:
 /// - `batch_metadata`: The RecordBatch metadata from the IPC message.
 /// - `body_buffer`: The buffer containing the record batch body data.
 /// - `version`: The Arrow IPC metadata version.
 ///
-/// Returns `None` if the data is malformed or too short.
+/// Returns `Err(arrow_error)` on errors
+/// - `arrow_error`: Error details e.g. malformed, too short or bad 
continuation marker.
 fn parse_ipc_message(
     data: &[u8],
-) -> Option<(
+) -> Result<(
     arrow::ipc::RecordBatch<'_>,
     Buffer,
     arrow::ipc::MetadataVersion,
@@ -558,30 +556,38 @@ fn parse_ipc_message(
     const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
 
     if data.len() < 8 {
-        return None;
+        Err(ParseError(format!("Invalid data length: {}", data.len())))?
     }
 
     let continuation = LittleEndian::read_u32(&data[0..4]);
     let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize;
 
     if continuation != CONTINUATION_MARKER {
-        return None;
+        Err(ParseError(format!(
+            "Invalid continuation marker: {continuation}"
+        )))?
     }
 
     if data.len() < 8 + metadata_size {
-        return None;
+        Err(ParseError(format!(
+            "Invalid data length. Remaining data length {} is shorter than 
specified size {}",
+            data.len() - 8,
+            metadata_size
+        )))?
     }
 
     let metadata_bytes = &data[8..8 + metadata_size];
-    let message = root_as_message(metadata_bytes).ok()?;
-    let batch_metadata = message.header_as_record_batch()?;
+    let message = root_as_message(metadata_bytes).map_err(|err| 
ParseError(err.to_string()))?;
+    let batch_metadata = message
+        .header_as_record_batch()
+        .ok_or(ParseError(String::from("Not a record batch")))?;
 
     let metadata_padded_size = (metadata_size + 7) & !7;
     let body_start = 8 + metadata_padded_size;
     let body_data = &data[body_start..];
     let body_buffer = Buffer::from(body_data);
 
-    Some((batch_metadata, body_buffer, message.version()))
+    Ok((batch_metadata, body_buffer, message.version()))
 }
 
 pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef {
@@ -602,7 +608,7 @@ pub fn to_arrow_schema(fluss_schema: &DataType) -> 
SchemaRef {
             SchemaRef::new(arrow_schema::Schema::new(fields))
         }
         _ => {
-            panic!("must be row data tyoe.")
+            panic!("must be row data type.")
         }
     }
 }
@@ -796,11 +802,8 @@ impl ReadContext {
             .map(|p| p.ordered_fields.as_slice())
     }
 
-    pub fn record_batch(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
-        let (batch_metadata, body_buffer, version) = match 
parse_ipc_message(data) {
-            Some(result) => result,
-            None => return Ok(None),
-        };
+    pub fn record_batch(&self, data: &[u8]) -> Result<RecordBatch> {
+        let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
 
         // the record batch from server must be ordered by field pos,
         // according to project to decide what arrow schema to use
@@ -837,7 +840,7 @@ impl ReadContext {
             }
             _ => record_batch,
         };
-        Ok(Some(record_batch))
+        Ok(record_batch)
     }
 
     pub fn record_batch_for_remote_log(&self, data: &[u8]) -> 
Result<Option<RecordBatch>> {
@@ -1076,4 +1079,55 @@ mod tests {
     fn test_timestamp_ltz_invalid_precision() {
         to_arrow_type(&DataTypes::timestamp_ltz_with_precision(10));
     }
+
+    #[test]
+    fn test_parse_ipc_message() {
+        let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]);
+        let result = parse_ipc_message(empty_body);
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            String::from("Arrow error: Parser error: Range [0, 4) is out of 
bounds.\n\n")
+        );
+
+        let invalid_data = &[];
+        assert_eq!(
+            parse_ipc_message(invalid_data).unwrap_err().to_string(),
+            String::from("Arrow error: Parser error: Invalid data length: 0")
+        );
+
+        let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001, 
0x00000000]);
+        assert_eq!(
+            parse_ipc_message(data_with_invalid_continuation)
+                .unwrap_err()
+                .to_string(),
+            String::from("Arrow error: Parser error: Invalid continuation 
marker: 1")
+        );
+
+        let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF, 
0x00000001]);
+        assert_eq!(
+            parse_ipc_message(data_with_invalid_length)
+                .unwrap_err()
+                .to_string(),
+            String::from(
+                "Arrow error: Parser error: Invalid data length. \
+                   Remaining data length 0 is shorter than specified size 1"
+            )
+        );
+
+        let data_with_invalid_length = &le_bytes(&[0xFFFFFFFF, 0x00000004, 
0x00000000]);
+        assert_eq!(
+            parse_ipc_message(data_with_invalid_length)
+                .unwrap_err()
+                .to_string(),
+            String::from("Arrow error: Parser error: Not a record batch")
+        );
+    }
+
+    fn le_bytes(vals: &[u32]) -> Vec<u8> {
+        let mut out = Vec::with_capacity(vals.len() * 4);
+        for &v in vals {
+            out.extend_from_slice(&v.to_le_bytes());
+        }
+        out
+    }
 }

Reply via email to