This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c7ad66d  chore: Fix panic / crashes in Rust and C++ (#365)
c7ad66d is described below

commit c7ad66d5bd5918408618db9f214a03abc82857c8
Author: Keith Lee <[email protected]>
AuthorDate: Fri Feb 27 01:58:28 2026 +0000

    chore: Fix panic / crashes in Rust and C++ (#365)
---
 bindings/cpp/src/lib.rs                            |  80 ++--
 bindings/cpp/src/types.rs                          |  34 +-
 bindings/python/src/table.rs                       |  56 ++-
 crates/examples/src/example_kv_table.rs            |   8 +-
 .../examples/src/example_partitioned_kv_table.rs   |  10 +-
 crates/examples/src/example_table.rs               |   6 +-
 crates/fluss/src/client/table/append.rs            |  17 +
 crates/fluss/src/client/table/partition_getter.rs  |   2 +-
 crates/fluss/src/client/table/scanner.rs           |  10 +-
 crates/fluss/src/client/table/upsert.rs            |   2 +-
 crates/fluss/src/record/arrow.rs                   |   2 +-
 crates/fluss/src/record/kv/kv_record_batch.rs      |   2 +-
 .../fluss/src/record/kv/kv_record_batch_builder.rs |   8 +-
 crates/fluss/src/row/column.rs                     | 465 +++++++++++----------
 crates/fluss/src/row/compacted/compacted_row.rs    | 105 ++---
 .../fluss/src/row/encode/compacted_key_encoder.rs  |   2 +-
 crates/fluss/src/row/field_getter.rs               |  45 +-
 crates/fluss/src/row/mod.rs                        | 231 ++++++----
 crates/fluss/src/row/row_decoder.rs                |   8 +-
 crates/fluss/tests/integration/kv_table.rs         | 266 +++++++-----
 crates/fluss/tests/integration/log_table.rs        | 200 +++++++--
 .../fluss/tests/integration/table_remote_scan.rs   |  14 +-
 22 files changed, 977 insertions(+), 596 deletions(-)

diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 9fbdc8f..32dbf7d 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -1784,7 +1784,7 @@ mod row_reader {
         allowed: impl FnOnce(&fcore::metadata::DataType) -> bool,
     ) -> Result<&'a fcore::metadata::DataType, String> {
         let col = get_column(columns, field)?;
-        if row.is_null_at(field) {
+        if row.is_null_at(field).map_err(|e| e.to_string())? {
             return Err(format!("field {field} is null"));
         }
         let dt = col.data_type();
@@ -1812,7 +1812,7 @@ mod row_reader {
         field: usize,
     ) -> Result<bool, String> {
         get_column(columns, field)?;
-        Ok(row.is_null_at(field))
+        row.is_null_at(field).map_err(|e| e.to_string())
     }
 
     pub fn get_bool(
@@ -1823,7 +1823,7 @@ mod row_reader {
         validate(row, columns, field, "get_bool", |dt| {
             matches!(dt, fcore::metadata::DataType::Boolean(_))
         })?;
-        Ok(row.get_boolean(field))
+        row.get_boolean(field).map_err(|e| e.to_string())
     }
 
     pub fn get_i32(
@@ -1839,11 +1839,17 @@ mod row_reader {
                     | fcore::metadata::DataType::Int(_)
             )
         })?;
-        Ok(match dt {
-            fcore::metadata::DataType::TinyInt(_) => row.get_byte(field) as 
i32,
-            fcore::metadata::DataType::SmallInt(_) => row.get_short(field) as 
i32,
-            _ => row.get_int(field),
-        })
+        match dt {
+            fcore::metadata::DataType::TinyInt(_) => row
+                .get_byte(field)
+                .map(|v| v as i32)
+                .map_err(|e| e.to_string()),
+            fcore::metadata::DataType::SmallInt(_) => row
+                .get_short(field)
+                .map(|v| v as i32)
+                .map_err(|e| e.to_string()),
+            _ => row.get_int(field).map_err(|e| e.to_string()),
+        }
     }
 
     pub fn get_i64(
@@ -1854,7 +1860,7 @@ mod row_reader {
         validate(row, columns, field, "get_i64", |dt| {
             matches!(dt, fcore::metadata::DataType::BigInt(_))
         })?;
-        Ok(row.get_long(field))
+        row.get_long(field).map_err(|e| e.to_string())
     }
 
     pub fn get_f32(
@@ -1865,7 +1871,7 @@ mod row_reader {
         validate(row, columns, field, "get_f32", |dt| {
             matches!(dt, fcore::metadata::DataType::Float(_))
         })?;
-        Ok(row.get_float(field))
+        row.get_float(field).map_err(|e| e.to_string())
     }
 
     pub fn get_f64(
@@ -1876,7 +1882,7 @@ mod row_reader {
         validate(row, columns, field, "get_f64", |dt| {
             matches!(dt, fcore::metadata::DataType::Double(_))
         })?;
-        Ok(row.get_double(field))
+        row.get_double(field).map_err(|e| e.to_string())
     }
 
     pub fn get_str<'a>(
@@ -1890,10 +1896,12 @@ mod row_reader {
                 fcore::metadata::DataType::Char(_) | 
fcore::metadata::DataType::String(_)
             )
         })?;
-        Ok(match dt {
-            fcore::metadata::DataType::Char(ct) => row.get_char(field, 
ct.length() as usize),
-            _ => row.get_string(field),
-        })
+        match dt {
+            fcore::metadata::DataType::Char(ct) => row
+                .get_char(field, ct.length() as usize)
+                .map_err(|e| e.to_string()),
+            _ => row.get_string(field).map_err(|e| e.to_string()),
+        }
     }
 
     pub fn get_bytes<'a>(
@@ -1907,10 +1915,12 @@ mod row_reader {
                 fcore::metadata::DataType::Binary(_) | 
fcore::metadata::DataType::Bytes(_)
             )
         })?;
-        Ok(match dt {
-            fcore::metadata::DataType::Binary(bt) => row.get_binary(field, 
bt.length()),
-            _ => row.get_bytes(field),
-        })
+        match dt {
+            fcore::metadata::DataType::Binary(bt) => row
+                .get_binary(field, bt.length())
+                .map_err(|e| e.to_string()),
+            _ => row.get_bytes(field).map_err(|e| e.to_string()),
+        }
     }
 
     pub fn get_date_days(
@@ -1921,7 +1931,9 @@ mod row_reader {
         validate(row, columns, field, "get_date_days", |dt| {
             matches!(dt, fcore::metadata::DataType::Date(_))
         })?;
-        Ok(row.get_date(field).get_inner())
+        row.get_date(field)
+            .map(|d| d.get_inner())
+            .map_err(|e| e.to_string())
     }
 
     pub fn get_time_millis(
@@ -1932,7 +1944,9 @@ mod row_reader {
         validate(row, columns, field, "get_time_millis", |dt| {
             matches!(dt, fcore::metadata::DataType::Time(_))
         })?;
-        Ok(row.get_time(field).get_inner())
+        row.get_time(field)
+            .map(|t| t.get_inner())
+            .map_err(|e| e.to_string())
     }
 
     pub fn get_ts_millis(
@@ -1948,12 +1962,14 @@ mod row_reader {
             )
         })?;
         match dt {
-            fcore::metadata::DataType::TimestampLTz(ts) => Ok(row
+            fcore::metadata::DataType::TimestampLTz(ts) => row
                 .get_timestamp_ltz(field, ts.precision())
-                .get_epoch_millisecond()),
-            fcore::metadata::DataType::Timestamp(ts) => Ok(row
+                .map(|v| v.get_epoch_millisecond())
+                .map_err(|e| e.to_string()),
+            fcore::metadata::DataType::Timestamp(ts) => row
                 .get_timestamp_ntz(field, ts.precision())
-                .get_millisecond()),
+                .map(|v| v.get_millisecond())
+                .map_err(|e| e.to_string()),
             dt => Err(format!("get_ts_millis: unexpected type {dt}")),
         }
     }
@@ -1971,12 +1987,14 @@ mod row_reader {
             )
         })?;
         match dt {
-            fcore::metadata::DataType::TimestampLTz(ts) => Ok(row
+            fcore::metadata::DataType::TimestampLTz(ts) => row
                 .get_timestamp_ltz(field, ts.precision())
-                .get_nano_of_millisecond()),
-            fcore::metadata::DataType::Timestamp(ts) => Ok(row
+                .map(|v| v.get_nano_of_millisecond())
+                .map_err(|e| e.to_string()),
+            fcore::metadata::DataType::Timestamp(ts) => row
                 .get_timestamp_ntz(field, ts.precision())
-                .get_nano_of_millisecond()),
+                .map(|v| v.get_nano_of_millisecond())
+                .map_err(|e| e.to_string()),
             dt => Err(format!("get_ts_nanos: unexpected type {dt}")),
         }
     }
@@ -1998,7 +2016,9 @@ mod row_reader {
         })?;
         match dt {
             fcore::metadata::DataType::Decimal(dd) => {
-                let decimal = row.get_decimal(field, dd.precision() as usize, 
dd.scale() as usize);
+                let decimal = row
+                    .get_decimal(field, dd.precision() as usize, dd.scale() as 
usize)
+                    .map_err(|e| e.to_string())?;
                 Ok(decimal.to_big_decimal().to_string())
             }
             dt => Err(format!("get_decimal_str: unexpected type {dt}")),
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 073a168..f8efe67 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -371,42 +371,42 @@ pub fn compacted_row_to_owned(
     let mut out = fcore::row::GenericRow::new(columns.len());
 
     for (i, col) in columns.iter().enumerate() {
-        if row.is_null_at(i) {
+        if row.is_null_at(i)? {
             out.set_field(i, Datum::Null);
             continue;
         }
 
         let datum = match col.data_type() {
-            fcore::metadata::DataType::Boolean(_) => 
Datum::Bool(row.get_boolean(i)),
-            fcore::metadata::DataType::TinyInt(_) => 
Datum::Int8(row.get_byte(i)),
-            fcore::metadata::DataType::SmallInt(_) => 
Datum::Int16(row.get_short(i)),
-            fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)),
-            fcore::metadata::DataType::BigInt(_) => 
Datum::Int64(row.get_long(i)),
-            fcore::metadata::DataType::Float(_) => 
Datum::Float32(row.get_float(i).into()),
-            fcore::metadata::DataType::Double(_) => 
Datum::Float64(row.get_double(i).into()),
+            fcore::metadata::DataType::Boolean(_) => 
Datum::Bool(row.get_boolean(i)?),
+            fcore::metadata::DataType::TinyInt(_) => 
Datum::Int8(row.get_byte(i)?),
+            fcore::metadata::DataType::SmallInt(_) => 
Datum::Int16(row.get_short(i)?),
+            fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)?),
+            fcore::metadata::DataType::BigInt(_) => 
Datum::Int64(row.get_long(i)?),
+            fcore::metadata::DataType::Float(_) => 
Datum::Float32(row.get_float(i)?.into()),
+            fcore::metadata::DataType::Double(_) => 
Datum::Float64(row.get_double(i)?.into()),
             fcore::metadata::DataType::String(_) => {
-                Datum::String(Cow::Owned(row.get_string(i).to_string()))
+                Datum::String(Cow::Owned(row.get_string(i)?.to_string()))
             }
             fcore::metadata::DataType::Bytes(_) => {
-                Datum::Blob(Cow::Owned(row.get_bytes(i).to_vec()))
+                Datum::Blob(Cow::Owned(row.get_bytes(i)?.to_vec()))
             }
-            fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)),
-            fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)),
+            fcore::metadata::DataType::Date(_) => 
Datum::Date(row.get_date(i)?),
+            fcore::metadata::DataType::Time(_) => 
Datum::Time(row.get_time(i)?),
             fcore::metadata::DataType::Timestamp(dt) => {
-                Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision()))
+                Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())?)
             }
             fcore::metadata::DataType::TimestampLTz(dt) => {
-                Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision()))
+                Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())?)
             }
             fcore::metadata::DataType::Decimal(dt) => {
-                let decimal = row.get_decimal(i, dt.precision() as usize, 
dt.scale() as usize);
+                let decimal = row.get_decimal(i, dt.precision() as usize, 
dt.scale() as usize)?;
                 Datum::Decimal(decimal)
             }
             fcore::metadata::DataType::Char(dt) => Datum::String(Cow::Owned(
-                row.get_char(i, dt.length() as usize).to_string(),
+                row.get_char(i, dt.length() as usize)?.to_string(),
             )),
             fcore::metadata::DataType::Binary(dt) => {
-                Datum::Blob(Cow::Owned(row.get_binary(i, 
dt.length()).to_vec()))
+                Datum::Blob(Cow::Owned(row.get_binary(i, 
dt.length())?.to_vec()))
             }
             other => return Err(anyhow!("Unsupported data type for column {i}: 
{other:?}")),
         };
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index bc2e956..660cd6b 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -1256,91 +1256,119 @@ pub fn datum_to_python_value(
     use fcore::metadata::DataType;
 
     // Check for null first
-    if row.is_null_at(pos) {
+    if row
+        .is_null_at(pos)
+        .map_err(|e| FlussError::from_core_error(&e))?
+    {
         return Ok(py.None());
     }
 
     match data_type {
         DataType::Boolean(_) => Ok(row
             .get_boolean(pos)
+            .map_err(|e| FlussError::from_core_error(&e))?
             .into_pyobject(py)?
             .to_owned()
             .into_any()
             .unbind()),
         DataType::TinyInt(_) => Ok(row
             .get_byte(pos)
+            .map_err(|e| FlussError::from_core_error(&e))?
             .into_pyobject(py)?
             .to_owned()
             .into_any()
             .unbind()),
         DataType::SmallInt(_) => Ok(row
             .get_short(pos)
+            .map_err(|e| FlussError::from_core_error(&e))?
             .into_pyobject(py)?
             .to_owned()
             .into_any()
             .unbind()),
         DataType::Int(_) => Ok(row
             .get_int(pos)
+            .map_err(|e| FlussError::from_core_error(&e))?
             .into_pyobject(py)?
             .to_owned()
             .into_any()
             .unbind()),
         DataType::BigInt(_) => Ok(row
             .get_long(pos)
+            .map_err(|e| FlussError::from_core_error(&e))?
             .into_pyobject(py)?
             .to_owned()
             .into_any()
             .unbind()),
         DataType::Float(_) => Ok(row
             .get_float(pos)
+            .map_err(|e| FlussError::from_core_error(&e))?
             .into_pyobject(py)?
             .to_owned()
             .into_any()
             .unbind()),
         DataType::Double(_) => Ok(row
             .get_double(pos)
+            .map_err(|e| FlussError::from_core_error(&e))?
             .into_pyobject(py)?
             .to_owned()
             .into_any()
             .unbind()),
         DataType::String(_) => {
-            let s = row.get_string(pos);
+            let s = row
+                .get_string(pos)
+                .map_err(|e| FlussError::from_core_error(&e))?;
             Ok(s.into_pyobject(py)?.into_any().unbind())
         }
         DataType::Char(char_type) => {
-            let s = row.get_char(pos, char_type.length() as usize);
+            let s = row
+                .get_char(pos, char_type.length() as usize)
+                .map_err(|e| FlussError::from_core_error(&e))?;
             Ok(s.into_pyobject(py)?.into_any().unbind())
         }
         DataType::Bytes(_) => {
-            let b = row.get_bytes(pos);
+            let b = row
+                .get_bytes(pos)
+                .map_err(|e| FlussError::from_core_error(&e))?;
             Ok(PyBytes::new(py, b).into_any().unbind())
         }
         DataType::Binary(binary_type) => {
-            let b = row.get_binary(pos, binary_type.length());
+            let b = row
+                .get_binary(pos, binary_type.length())
+                .map_err(|e| FlussError::from_core_error(&e))?;
             Ok(PyBytes::new(py, b).into_any().unbind())
         }
         DataType::Decimal(decimal_type) => {
-            let decimal = row.get_decimal(
-                pos,
-                decimal_type.precision() as usize,
-                decimal_type.scale() as usize,
-            );
+            let decimal = row
+                .get_decimal(
+                    pos,
+                    decimal_type.precision() as usize,
+                    decimal_type.scale() as usize,
+                )
+                .map_err(|e| FlussError::from_core_error(&e))?;
             rust_decimal_to_python(py, &decimal)
         }
         DataType::Date(_) => {
-            let date = row.get_date(pos);
+            let date = row
+                .get_date(pos)
+                .map_err(|e| FlussError::from_core_error(&e))?;
             rust_date_to_python(py, date)
         }
         DataType::Time(_) => {
-            let time = row.get_time(pos);
+            let time = row
+                .get_time(pos)
+                .map_err(|e| FlussError::from_core_error(&e))?;
             rust_time_to_python(py, time)
         }
         DataType::Timestamp(ts_type) => {
-            let ts = row.get_timestamp_ntz(pos, ts_type.precision());
+            let ts = row
+                .get_timestamp_ntz(pos, ts_type.precision())
+                .map_err(|e| FlussError::from_core_error(&e))?;
             rust_timestamp_ntz_to_python(py, ts)
         }
         DataType::TimestampLTz(ts_type) => {
-            let ts = row.get_timestamp_ltz(pos, ts_type.precision());
+            let ts = row
+                .get_timestamp_ltz(pos, ts_type.precision())
+                .map_err(|e| FlussError::from_core_error(&e))?;
             rust_timestamp_ltz_to_python(py, ts)
         }
         _ => Err(FlussError::new_err(format!(
diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index 90788b1..8fb60ba 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -75,8 +75,8 @@ pub async fn main() -> Result<()> {
         let row = result.get_single_row()?.unwrap();
         println!(
             "Found id={id}: name={}, age={}",
-            row.get_string(1),
-            row.get_long(2)
+            row.get_string(1)?,
+            row.get_long(2)?
         );
     }
 
@@ -92,8 +92,8 @@ pub async fn main() -> Result<()> {
     let row = result.get_single_row()?.unwrap();
     println!(
         "Verified update: name={}, age={}",
-        row.get_string(1),
-        row.get_long(2)
+        row.get_string(1)?,
+        row.get_long(2)?
     );
 
     println!("\n=== Deleting ===");
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
index e047178..9cd2e7d 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -90,9 +90,9 @@ pub async fn main() -> Result<()> {
         let row = result.get_single_row()?.unwrap();
         println!(
             "Found id={id}: region={}, zone={}, score={}",
-            row.get_string(1),
-            row.get_long(2),
-            row.get_long(3)
+            row.get_string(1)?,
+            row.get_long(2)?,
+            row.get_long(3)?
         );
     }
 
@@ -109,8 +109,8 @@ pub async fn main() -> Result<()> {
     let row = result.get_single_row()?.unwrap();
     println!(
         "Verified update: region={}, zone={}",
-        row.get_string(1),
-        row.get_long(2)
+        row.get_string(1)?,
+        row.get_long(2)?
     );
 
     println!("\n=== Deleting ===");
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index cfe1627..e4ad1fb 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -83,9 +83,9 @@ pub async fn main() -> Result<()> {
             let row = record.row();
             println!(
                 "{{{}, {}, {}}}@{}",
-                row.get_int(0),
-                row.get_string(1),
-                row.get_long(2),
+                row.get_int(0)?,
+                row.get_string(1)?,
+                row.get_long(2)?,
                 record.offset()
             );
         }
diff --git a/crates/fluss/src/client/table/append.rs 
b/crates/fluss/src/client/table/append.rs
index 942253f..a58433f 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -17,6 +17,7 @@
 
 use crate::client::table::partition_getter::{PartitionGetter, 
get_physical_path};
 use crate::client::{WriteRecord, WriteResultFuture, WriterClient};
+use crate::error::Error::IllegalArgument;
 use crate::error::Result;
 use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
 use crate::row::{ColumnarRow, InternalRow};
@@ -69,6 +70,21 @@ pub struct AppendWriter {
 }
 
 impl AppendWriter {
+    fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
+        let expected = self.table_info.get_row_type().fields().len();
+        if row.get_field_count() != expected {
+            return Err(IllegalArgument {
+                message: format!(
+                    "The field count of the row does not match the table 
schema. \
+                     Expected: {}, Actual: {}",
+                    expected,
+                    row.get_field_count()
+                ),
+            });
+        }
+        Ok(())
+    }
+
     /// Appends a row to the table.
     ///
     /// This method returns a [`WriteResultFuture`] immediately after queueing 
the write,
@@ -81,6 +97,7 @@ impl AppendWriter {
     /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
     /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
     pub fn append<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture> 
{
+        self.check_field_count(row)?;
         let physical_table_path = Arc::new(get_physical_path(
             &self.table_path,
             self.partition_getter.as_ref(),
diff --git a/crates/fluss/src/client/table/partition_getter.rs 
b/crates/fluss/src/client/table/partition_getter.rs
index a1aad2d..1115ded 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -87,7 +87,7 @@ impl PartitionGetter {
         let mut partition_values = Vec::with_capacity(self.partitions.len());
 
         for (data_type, field_getter) in &self.partitions {
-            let value = field_getter.get_field(row);
+            let value = field_getter.get_field(row)?;
             if value.is_null() {
                 return Err(IllegalArgument {
                     message: "Partition value shouldn't be null.".to_string(),
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 4b6f809..3ec9106 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -114,9 +114,9 @@ impl<'a> TableScan<'a> {
     ///         let row = record.row();
     ///         println!(
     ///             "{{{}, {}, {}}}@{}",
-    ///             row.get_int(0),
-    ///             row.get_string(2),
-    ///             row.get_string(3),
+    ///             row.get_int(0)?,
+    ///             row.get_string(2)?,
+    ///             row.get_string(3)?,
     ///             record.offset()
     ///         );
     ///     }
@@ -188,8 +188,8 @@ impl<'a> TableScan<'a> {
     ///         let row = record.row();
     ///         println!(
     ///             "{{{}, {}}}@{}",
-    ///             row.get_int(0),
-    ///             row.get_string(1),
+    ///             row.get_int(0)?,
+    ///             row.get_string(1)?,
     ///             record.offset()
     ///         );
     ///     }
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index 7057b90..52ec37b 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -328,7 +328,7 @@ impl UpsertWriter {
         })?;
         encoder.start_new_row()?;
         for (pos, field_getter) in self.field_getters.iter().enumerate() {
-            let datum = field_getter.get_field(row);
+            let datum = field_getter.get_field(row)?;
             encoder.encode_field(pos, datum)?;
         }
         encoder.finish_row()
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 7fb9d34..ea27836 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -355,7 +355,7 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
 
     fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
         for (idx, getter) in self.field_getters.iter().enumerate() {
-            let datum = getter.get_field(row);
+            let datum = getter.get_field(row)?;
             let field_type = self.table_schema.field(idx).data_type();
             let builder =
                 self.arrow_column_builders
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs 
b/crates/fluss/src/record/kv/kv_record_batch.rs
index eb89d69..14ff2e9 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -445,7 +445,7 @@ mod tests {
         assert_eq!(record1.key().as_ref(), key1);
         assert!(!record1.is_deletion());
         let row1 = record1.row(&*decoder).unwrap();
-        assert_eq!(row1.get_bytes(0), &[1, 2, 3, 4, 5]);
+        assert_eq!(row1.get_bytes(0).unwrap(), &[1, 2, 3, 4, 5]);
 
         let record2 = iter.next().unwrap().unwrap();
         assert_eq!(record2.key().as_ref(), key2);
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 8370764..0e80633 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -555,14 +555,14 @@ mod tests {
                 1 => {
                     assert_eq!(rec.key().as_ref(), key1);
                     let row = rec.row(&*decoder).unwrap();
-                    assert_eq!(row.get_int(0), 42);
-                    assert_eq!(row.get_string(1), "hello");
+                    assert_eq!(row.get_int(0)?, 42);
+                    assert_eq!(row.get_string(1)?, "hello");
                 }
                 2 => {
                     assert_eq!(rec.key().as_ref(), key2);
                     let row = rec.row(&*decoder).unwrap();
-                    assert_eq!(row.get_int(0), 100);
-                    assert_eq!(row.get_string(1), "world");
+                    assert_eq!(row.get_int(0)?, 100);
+                    assert_eq!(row.get_string(1)?, "world");
                 }
                 3 => {
                     assert_eq!(rec.key().as_ref(), key3);
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 50db32b..c07fe97 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -15,15 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
 use crate::row::InternalRow;
-use arrow::array::{
-    Array, AsArray, BinaryArray, Date32Array, Decimal128Array, 
FixedSizeBinaryArray, Float32Array,
-    Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, 
StringArray,
-    Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, 
Time64NanosecondArray,
-    TimestampMicrosecondArray, TimestampMillisecondArray, 
TimestampNanosecondArray,
-    TimestampSecondArray,
+use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
+use arrow::array::{Array, AsArray, BinaryArray, RecordBatch, StringArray};
+use arrow::datatypes::{
+    DataType as ArrowDataType, Date32Type, Decimal128Type, Float32Type, 
Float64Type, Int8Type,
+    Int16Type, Int32Type, Int64Type, Time32MillisecondType, Time32SecondType,
+    Time64MicrosecondType, Time64NanosecondType, TimeUnit, 
TimestampMicrosecondType,
+    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
 };
-use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
 use std::sync::Arc;
 
 #[derive(Clone)]
@@ -59,6 +61,18 @@ impl ColumnarRow {
         &self.record_batch
     }
 
+    fn column(&self, pos: usize) -> Result<&Arc<dyn Array>> {
+        self.record_batch
+            .columns()
+            .get(pos)
+            .ok_or_else(|| IllegalArgument {
+                message: format!(
+                    "column index {pos} out of bounds (batch has {} columns)",
+                    self.record_batch.num_columns()
+                ),
+            })
+    }
+
     /// Generic helper to read timestamp from Arrow, handling all TimeUnit 
conversions.
     /// Like Java, the precision parameter is ignored - conversion is 
determined by Arrow TimeUnit.
     fn read_timestamp_from_arrow<T>(
@@ -66,114 +80,133 @@ impl ColumnarRow {
         pos: usize,
         _precision: u32,
         construct_compact: impl FnOnce(i64) -> T,
-        construct_with_nanos: impl FnOnce(i64, i32) -> crate::error::Result<T>,
-    ) -> T {
-        let schema = self.record_batch.schema();
-        let arrow_field = schema.field(pos);
-        let column = self.record_batch.column(pos);
-
-        // Read value based on the actual Arrow timestamp type
-        let value = match arrow_field.data_type() {
-            ArrowDataType::Timestamp(TimeUnit::Second, _) => column
-                .as_any()
-                .downcast_ref::<TimestampSecondArray>()
-                .expect("Expected TimestampSecondArray")
-                .value(self.row_id),
-            ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => column
-                .as_any()
-                .downcast_ref::<TimestampMillisecondArray>()
-                .expect("Expected TimestampMillisecondArray")
-                .value(self.row_id),
-            ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => column
-                .as_any()
-                .downcast_ref::<TimestampMicrosecondArray>()
-                .expect("Expected TimestampMicrosecondArray")
-                .value(self.row_id),
-            ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => column
-                .as_any()
-                .downcast_ref::<TimestampNanosecondArray>()
-                .expect("Expected TimestampNanosecondArray")
-                .value(self.row_id),
-            other => panic!("Expected Timestamp column at position {pos}, got 
{other:?}"),
+        construct_with_nanos: impl FnOnce(i64, i32) -> Result<T>,
+    ) -> Result<T> {
+        let column = self.column(pos)?;
+
+        // Read value and time unit based on the actual Arrow timestamp type
+        let (value, time_unit) = match column.data_type() {
+            ArrowDataType::Timestamp(TimeUnit::Second, _) => (
+                column
+                    .as_primitive_opt::<TimestampSecondType>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("expected TimestampSecondArray at 
position {pos}"),
+                    })?
+                    .value(self.row_id),
+                TimeUnit::Second,
+            ),
+            ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => (
+                column
+                    .as_primitive_opt::<TimestampMillisecondType>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("expected TimestampMillisecondArray 
at position {pos}"),
+                    })?
+                    .value(self.row_id),
+                TimeUnit::Millisecond,
+            ),
+            ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => (
+                column
+                    .as_primitive_opt::<TimestampMicrosecondType>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("expected TimestampMicrosecondArray 
at position {pos}"),
+                    })?
+                    .value(self.row_id),
+                TimeUnit::Microsecond,
+            ),
+            ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => (
+                column
+                    .as_primitive_opt::<TimestampNanosecondType>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("expected TimestampNanosecondArray at 
position {pos}"),
+                    })?
+                    .value(self.row_id),
+                TimeUnit::Nanosecond,
+            ),
+            other => {
+                return Err(IllegalArgument {
+                    message: format!("expected Timestamp column at position 
{pos}, got {other:?}"),
+                });
+            }
         };
 
         // Convert based on Arrow TimeUnit
-        let (millis, nanos) = match arrow_field.data_type() {
-            ArrowDataType::Timestamp(time_unit, _) => match time_unit {
-                TimeUnit::Second => (value * 1000, 0),
-                TimeUnit::Millisecond => (value, 0),
-                TimeUnit::Microsecond => {
-                    // Use Euclidean division so that nanos is always 
non-negative,
-                    // even for timestamps before the Unix epoch.
-                    let millis = value.div_euclid(1000);
-                    let nanos = (value.rem_euclid(1000) * 1000) as i32;
-                    (millis, nanos)
-                }
-                TimeUnit::Nanosecond => {
-                    // Use Euclidean division so that nanos is always in [0, 
999_999].
-                    let millis = value.div_euclid(1_000_000);
-                    let nanos = value.rem_euclid(1_000_000) as i32;
-                    (millis, nanos)
-                }
-            },
-            _ => unreachable!(),
+        let (millis, nanos) = match time_unit {
+            TimeUnit::Second => (value * 1000, 0),
+            TimeUnit::Millisecond => (value, 0),
+            TimeUnit::Microsecond => {
+                // Use Euclidean division so that nanos is always non-negative,
+                // even for timestamps before the Unix epoch.
+                let millis = value.div_euclid(1000);
+                let nanos = (value.rem_euclid(1000) * 1000) as i32;
+                (millis, nanos)
+            }
+            TimeUnit::Nanosecond => {
+                // Use Euclidean division so that nanos is always in [0, 
999_999].
+                let millis = value.div_euclid(1_000_000);
+                let nanos = value.rem_euclid(1_000_000) as i32;
+                (millis, nanos)
+            }
         };
 
         if nanos == 0 {
-            construct_compact(millis)
+            Ok(construct_compact(millis))
         } else {
-            // nanos is guaranteed to be in valid range [0, 999_999] by 
arithmetic
-            construct_with_nanos(millis, nanos).expect("nanos in valid range 
by construction")
+            construct_with_nanos(millis, nanos)
         }
     }
 
     /// Read date value from Arrow Date32Array
-    fn read_date_from_arrow(&self, pos: usize) -> i32 {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<Date32Array>()
-            .expect("Expected Date32Array")
-            .value(self.row_id)
+    fn read_date_from_arrow(&self, pos: usize) -> Result<i32> {
+        Ok(self
+            .column(pos)?
+            .as_primitive_opt::<Date32Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected Date32Array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
     /// Read time value from Arrow Time32/Time64 arrays, converting to 
milliseconds
-    fn read_time_from_arrow(&self, pos: usize) -> i32 {
-        let schema = self.record_batch.schema();
-        let arrow_field = schema.field(pos);
-        let column = self.record_batch.column(pos);
+    fn read_time_from_arrow(&self, pos: usize) -> Result<i32> {
+        let column = self.column(pos)?;
 
-        match arrow_field.data_type() {
+        match column.data_type() {
             ArrowDataType::Time32(TimeUnit::Second) => {
                 let value = column
-                    .as_any()
-                    .downcast_ref::<Time32SecondArray>()
-                    .expect("Expected Time32SecondArray")
+                    .as_primitive_opt::<Time32SecondType>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("expected Time32SecondArray at 
position {pos}"),
+                    })?
                     .value(self.row_id);
-                value * 1000 // Convert seconds to milliseconds
+                Ok(value * 1000) // Convert seconds to milliseconds
             }
-            ArrowDataType::Time32(TimeUnit::Millisecond) => column
-                .as_any()
-                .downcast_ref::<Time32MillisecondArray>()
-                .expect("Expected Time32MillisecondArray")
-                .value(self.row_id),
+            ArrowDataType::Time32(TimeUnit::Millisecond) => Ok(column
+                .as_primitive_opt::<Time32MillisecondType>()
+                .ok_or_else(|| IllegalArgument {
+                    message: format!("expected Time32MillisecondArray at 
position {pos}"),
+                })?
+                .value(self.row_id)),
             ArrowDataType::Time64(TimeUnit::Microsecond) => {
                 let value = column
-                    .as_any()
-                    .downcast_ref::<Time64MicrosecondArray>()
-                    .expect("Expected Time64MicrosecondArray")
+                    .as_primitive_opt::<Time64MicrosecondType>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("expected Time64MicrosecondArray at 
position {pos}"),
+                    })?
                     .value(self.row_id);
-                (value / 1000) as i32 // Convert microseconds to milliseconds
+                Ok((value / 1000) as i32) // Convert microseconds to 
milliseconds
             }
             ArrowDataType::Time64(TimeUnit::Nanosecond) => {
                 let value = column
-                    .as_any()
-                    .downcast_ref::<Time64NanosecondArray>()
-                    .expect("Expected Time64NanosecondArray")
+                    .as_primitive_opt::<Time64NanosecondType>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("expected Time64NanosecondArray at 
position {pos}"),
+                    })?
                     .value(self.row_id);
-                (value / 1_000_000) as i32 // Convert nanoseconds to 
milliseconds
+                Ok((value / 1_000_000) as i32) // Convert nanoseconds to 
milliseconds
             }
-            other => panic!("Expected Time column at position {pos}, got 
{other:?}"),
+            other => Err(IllegalArgument {
+                message: format!("expected Time column at position {pos}, got 
{other:?}"),
+            }),
         }
     }
 }
@@ -183,106 +216,121 @@ impl InternalRow for ColumnarRow {
         self.record_batch.num_columns()
     }
 
-    fn is_null_at(&self, pos: usize) -> bool {
-        self.record_batch.column(pos).is_null(self.row_id)
+    fn is_null_at(&self, pos: usize) -> Result<bool> {
+        Ok(self.column(pos)?.is_null(self.row_id))
     }
 
-    fn get_boolean(&self, pos: usize) -> bool {
-        self.record_batch
-            .column(pos)
-            .as_boolean()
-            .value(self.row_id)
+    fn get_boolean(&self, pos: usize) -> Result<bool> {
+        Ok(self
+            .column(pos)?
+            .as_boolean_opt()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected boolean array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_byte(&self, pos: usize) -> i8 {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<Int8Array>()
-            .expect("Expect byte array")
-            .value(self.row_id)
+    fn get_byte(&self, pos: usize) -> Result<i8> {
+        Ok(self
+            .column(pos)?
+            .as_primitive_opt::<Int8Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected byte array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_short(&self, pos: usize) -> i16 {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<Int16Array>()
-            .expect("Expect short array")
-            .value(self.row_id)
+    fn get_short(&self, pos: usize) -> Result<i16> {
+        Ok(self
+            .column(pos)?
+            .as_primitive_opt::<Int16Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected short array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_int(&self, pos: usize) -> i32 {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .expect("Expect int array")
-            .value(self.row_id)
+    fn get_int(&self, pos: usize) -> Result<i32> {
+        Ok(self
+            .column(pos)?
+            .as_primitive_opt::<Int32Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected int array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_long(&self, pos: usize) -> i64 {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<Int64Array>()
-            .expect("Expect long array")
-            .value(self.row_id)
+    fn get_long(&self, pos: usize) -> Result<i64> {
+        Ok(self
+            .column(pos)?
+            .as_primitive_opt::<Int64Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected long array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_float(&self, pos: usize) -> f32 {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<Float32Array>()
-            .expect("Expect float32 array")
-            .value(self.row_id)
+    fn get_float(&self, pos: usize) -> Result<f32> {
+        Ok(self
+            .column(pos)?
+            .as_primitive_opt::<Float32Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected float32 array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_double(&self, pos: usize) -> f64 {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .expect("Expect float64 array")
-            .value(self.row_id)
+    fn get_double(&self, pos: usize) -> Result<f64> {
+        Ok(self
+            .column(pos)?
+            .as_primitive_opt::<Float64Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected float64 array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_char(&self, pos: usize, _length: usize) -> &str {
-        self.record_batch
-            .column(pos)
+    fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
+        Ok(self
+            .column(pos)?
             .as_any()
             .downcast_ref::<StringArray>()
-            .expect("Expected String array for char type")
-            .value(self.row_id)
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected String array for char type at 
position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_string(&self, pos: usize) -> &str {
-        self.record_batch
-            .column(pos)
+    fn get_string(&self, pos: usize) -> Result<&str> {
+        Ok(self
+            .column(pos)?
             .as_any()
             .downcast_ref::<StringArray>()
-            .expect("Expected String array.")
-            .value(self.row_id)
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected String array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
crate::row::Decimal {
+    fn get_decimal(
+        &self,
+        pos: usize,
+        precision: usize,
+        scale: usize,
+    ) -> Result<crate::row::Decimal> {
         use arrow::datatypes::DataType;
 
-        let column = self.record_batch.column(pos);
+        let column = self.column(pos)?;
         let array = column
-            .as_any()
-            .downcast_ref::<Decimal128Array>()
-            .unwrap_or_else(|| {
-                panic!(
-                    "Expected Decimal128Array at column {}, found: {:?}",
-                    pos,
+            .as_primitive_opt::<Decimal128Type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!(
+                    "expected Decimal128Array at column {pos}, found: {:?}",
                     column.data_type()
-                )
-            });
+                ),
+            })?;
 
         // Contract: caller must check is_null_at() before calling get_decimal.
-        // Calling on null value violates the contract and returns garbage data
         debug_assert!(
             !array.is_null(self.row_id),
             "get_decimal called on null value at pos {} row {}",
@@ -290,12 +338,16 @@ impl InternalRow for ColumnarRow {
             self.row_id
         );
 
-        // Read scale from Arrow schema field metadata
-        let schema = self.record_batch.schema();
-        let field = schema.field(pos);
-        let arrow_scale = match field.data_type() {
+        // Read scale from Arrow column data type
+        let arrow_scale = match column.data_type() {
             DataType::Decimal128(_p, s) => *s as i64,
-            dt => panic!("Expected Decimal128 data type at column {pos}, 
found: {dt:?}"),
+            dt => {
+                return Err(IllegalArgument {
+                    message: format!(
+                        "expected Decimal128 data type at column {pos}, found: 
{dt:?}"
+                    ),
+                });
+            }
         };
 
         let i128_val = array.value(self.row_id);
@@ -307,60 +359,53 @@ impl InternalRow for ColumnarRow {
             precision as u32,
             scale as u32,
         )
-        .unwrap_or_else(|e| {
-            panic!(
-                "Failed to create Decimal at column {} row {}: {}",
-                pos, self.row_id, e
-            )
-        })
     }
 
-    fn get_date(&self, pos: usize) -> crate::row::datum::Date {
-        crate::row::datum::Date::new(self.read_date_from_arrow(pos))
+    fn get_date(&self, pos: usize) -> Result<Date> {
+        Ok(Date::new(self.read_date_from_arrow(pos)?))
     }
 
-    fn get_time(&self, pos: usize) -> crate::row::datum::Time {
-        crate::row::datum::Time::new(self.read_time_from_arrow(pos))
+    fn get_time(&self, pos: usize) -> Result<Time> {
+        Ok(Time::new(self.read_time_from_arrow(pos)?))
     }
 
-    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampNtz {
-        // Like Java's ArrowTimestampNtzColumnVector, we ignore the precision 
parameter
-        // and determine the conversion from the Arrow column's TimeUnit.
+    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
Result<TimestampNtz> {
         self.read_timestamp_from_arrow(
             pos,
             precision,
-            crate::row::datum::TimestampNtz::new,
-            crate::row::datum::TimestampNtz::from_millis_nanos,
+            TimestampNtz::new,
+            TimestampNtz::from_millis_nanos,
         )
     }
 
-    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampLtz {
-        // Like Java's ArrowTimestampLtzColumnVector, we ignore the precision 
parameter
-        // and determine the conversion from the Arrow column's TimeUnit.
+    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
Result<TimestampLtz> {
         self.read_timestamp_from_arrow(
             pos,
             precision,
-            crate::row::datum::TimestampLtz::new,
-            crate::row::datum::TimestampLtz::from_millis_nanos,
+            TimestampLtz::new,
+            TimestampLtz::from_millis_nanos,
         )
     }
 
-    fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<FixedSizeBinaryArray>()
-            .expect("Expected binary array.")
-            .value(self.row_id)
+    fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
+        Ok(self
+            .column(pos)?
+            .as_fixed_size_binary_opt()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected binary array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 
-    fn get_bytes(&self, pos: usize) -> &[u8] {
-        self.record_batch
-            .column(pos)
+    fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
+        Ok(self
+            .column(pos)?
             .as_any()
             .downcast_ref::<BinaryArray>()
-            .expect("Expected bytes array.")
-            .value(self.row_id)
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected bytes array at position {pos}"),
+            })?
+            .value(self.row_id))
     }
 }
 
@@ -368,8 +413,8 @@ impl InternalRow for ColumnarRow {
 mod tests {
     use super::*;
     use arrow::array::{
-        BinaryArray, BooleanArray, Float32Array, Float64Array, Int8Array, 
Int16Array, Int32Array,
-        Int64Array, StringArray,
+        BinaryArray, BooleanArray, Decimal128Array, Float32Array, 
Float64Array, Int8Array,
+        Int16Array, Int32Array, Int64Array, StringArray,
     };
     use arrow::datatypes::{DataType, Field, Schema};
 
@@ -407,16 +452,16 @@ mod tests {
 
         let mut row = ColumnarRow::new(Arc::new(batch));
         assert_eq!(row.get_field_count(), 10);
-        assert!(row.get_boolean(0));
-        assert_eq!(row.get_byte(1), 1);
-        assert_eq!(row.get_short(2), 2);
-        assert_eq!(row.get_int(3), 3);
-        assert_eq!(row.get_long(4), 4);
-        assert_eq!(row.get_float(5), 1.25);
-        assert_eq!(row.get_double(6), 2.5);
-        assert_eq!(row.get_string(7), "hello");
-        assert_eq!(row.get_bytes(8), b"data");
-        assert_eq!(row.get_char(9, 2), "ab");
+        assert!(row.get_boolean(0).unwrap());
+        assert_eq!(row.get_byte(1).unwrap(), 1);
+        assert_eq!(row.get_short(2).unwrap(), 2);
+        assert_eq!(row.get_int(3).unwrap(), 3);
+        assert_eq!(row.get_long(4).unwrap(), 4);
+        assert_eq!(row.get_float(5).unwrap(), 1.25);
+        assert_eq!(row.get_double(6).unwrap(), 2.5);
+        assert_eq!(row.get_string(7).unwrap(), "hello");
+        assert_eq!(row.get_bytes(8).unwrap(), b"data");
+        assert_eq!(row.get_char(9, 2).unwrap(), "ab");
         row.set_row_id(0);
         assert_eq!(row.get_row_id(), 0);
     }
@@ -465,12 +510,12 @@ mod tests {
 
         // Verify decimal values
         assert_eq!(
-            row.get_decimal(0, 10, 2),
+            row.get_decimal(0, 10, 2).unwrap(),
             
crate::row::Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 
10, 2)
                 .unwrap()
         );
         assert_eq!(
-            row.get_decimal(1, 20, 5),
+            row.get_decimal(1, 20, 5).unwrap(),
             crate::row::Decimal::from_big_decimal(
                 BigDecimal::new(BigInt::from(1234567890), 5),
                 20,
@@ -479,7 +524,7 @@ mod tests {
             .unwrap()
         );
         assert_eq!(
-            row.get_decimal(2, 38, 10),
+            row.get_decimal(2, 38, 10).unwrap(),
             crate::row::Decimal::from_big_decimal(
                 BigDecimal::new(BigInt::from(999999999999999999i128), 10),
                 38,
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index 2322207..918ebdf 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -16,9 +16,11 @@
 // under the License.
 
 use crate::client::WriteFormat;
+use crate::error::Result;
 use crate::metadata::RowType;
 use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, 
CompactedRowReader};
-use crate::row::{GenericRow, InternalRow};
+use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
+use crate::row::{Decimal, GenericRow, InternalRow};
 use std::sync::{Arc, OnceLock};
 
 // Reference implementation:
@@ -81,74 +83,80 @@ impl<'a> InternalRow for CompactedRow<'a> {
         self.arity
     }
 
-    fn is_null_at(&self, pos: usize) -> bool {
-        self.deserializer.get_row_type().fields().as_slice()[pos]
-            .data_type
-            .is_nullable()
-            && self.reader.is_null_at(pos)
+    fn is_null_at(&self, pos: usize) -> Result<bool> {
+        let fields = self.deserializer.get_row_type().fields();
+        if pos >= fields.len() {
+            return Err(crate::error::Error::IllegalArgument {
+                message: format!(
+                    "position {pos} out of bounds (row has {} fields)",
+                    fields.len()
+                ),
+            });
+        }
+        Ok(fields.as_slice()[pos].data_type.is_nullable() && 
self.reader.is_null_at(pos))
     }
 
-    fn get_boolean(&self, pos: usize) -> bool {
+    fn get_boolean(&self, pos: usize) -> Result<bool> {
         self.decoded_row().get_boolean(pos)
     }
 
-    fn get_byte(&self, pos: usize) -> i8 {
+    fn get_byte(&self, pos: usize) -> Result<i8> {
         self.decoded_row().get_byte(pos)
     }
 
-    fn get_short(&self, pos: usize) -> i16 {
+    fn get_short(&self, pos: usize) -> Result<i16> {
         self.decoded_row().get_short(pos)
     }
 
-    fn get_int(&self, pos: usize) -> i32 {
+    fn get_int(&self, pos: usize) -> Result<i32> {
         self.decoded_row().get_int(pos)
     }
 
-    fn get_long(&self, pos: usize) -> i64 {
+    fn get_long(&self, pos: usize) -> Result<i64> {
         self.decoded_row().get_long(pos)
     }
 
-    fn get_float(&self, pos: usize) -> f32 {
+    fn get_float(&self, pos: usize) -> Result<f32> {
         self.decoded_row().get_float(pos)
     }
 
-    fn get_double(&self, pos: usize) -> f64 {
+    fn get_double(&self, pos: usize) -> Result<f64> {
         self.decoded_row().get_double(pos)
     }
 
-    fn get_char(&self, pos: usize, length: usize) -> &str {
+    fn get_char(&self, pos: usize, length: usize) -> Result<&str> {
         self.decoded_row().get_char(pos, length)
     }
 
-    fn get_string(&self, pos: usize) -> &str {
+    fn get_string(&self, pos: usize) -> Result<&str> {
         self.decoded_row().get_string(pos)
     }
 
-    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
crate::row::Decimal {
+    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
Result<Decimal> {
         self.decoded_row().get_decimal(pos, precision, scale)
     }
 
-    fn get_date(&self, pos: usize) -> crate::row::datum::Date {
+    fn get_date(&self, pos: usize) -> Result<Date> {
         self.decoded_row().get_date(pos)
     }
 
-    fn get_time(&self, pos: usize) -> crate::row::datum::Time {
+    fn get_time(&self, pos: usize) -> Result<Time> {
         self.decoded_row().get_time(pos)
     }
 
-    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampNtz {
+    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
Result<TimestampNtz> {
         self.decoded_row().get_timestamp_ntz(pos, precision)
     }
 
-    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampLtz {
+    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
Result<TimestampLtz> {
         self.decoded_row().get_timestamp_ltz(pos, precision)
     }
 
-    fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
+    fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]> {
         self.decoded_row().get_binary(pos, length)
     }
 
-    fn get_bytes(&self, pos: usize) -> &[u8] {
+    fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
         self.decoded_row().get_bytes(pos)
     }
 
@@ -203,15 +211,15 @@ mod tests {
         let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         assert_eq!(row.get_field_count(), 9);
-        assert!(row.get_boolean(0));
-        assert_eq!(row.get_byte(1), 1);
-        assert_eq!(row.get_short(2), 100);
-        assert_eq!(row.get_int(3), 1000);
-        assert_eq!(row.get_long(4), 10000);
-        assert_eq!(row.get_float(5), 1.5);
-        assert_eq!(row.get_double(6), 2.5);
-        assert_eq!(row.get_string(7), "Hello World");
-        assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]);
+        assert!(row.get_boolean(0).unwrap());
+        assert_eq!(row.get_byte(1).unwrap(), 1);
+        assert_eq!(row.get_short(2).unwrap(), 100);
+        assert_eq!(row.get_int(3).unwrap(), 1000);
+        assert_eq!(row.get_long(4).unwrap(), 10000);
+        assert_eq!(row.get_float(5).unwrap(), 1.5);
+        assert_eq!(row.get_double(6).unwrap(), 2.5);
+        assert_eq!(row.get_string(7).unwrap(), "Hello World");
+        assert_eq!(row.get_bytes(8).unwrap(), &[1, 2, 3, 4, 5]);
 
         // Test with nulls and negative values
         let row_type = RowType::with_data_types(vec![
@@ -228,13 +236,13 @@ mod tests {
         let bytes = writer.to_bytes();
         let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
-        assert!(!row.is_null_at(0));
-        assert!(row.is_null_at(1));
-        assert!(!row.is_null_at(2));
-        assert_eq!(row.get_int(0), -42);
-        assert_eq!(row.get_double(2), 2.71);
+        assert!(!row.is_null_at(0).unwrap());
+        assert!(row.is_null_at(1).unwrap());
+        assert!(!row.is_null_at(2).unwrap());
+        assert_eq!(row.get_int(0).unwrap(), -42);
+        assert_eq!(row.get_double(2).unwrap(), 2.71);
         // Verify caching works on repeated reads
-        assert_eq!(row.get_int(0), -42);
+        assert_eq!(row.get_int(0).unwrap(), -42);
     }
 
     #[test]
@@ -285,30 +293,33 @@ mod tests {
         let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         // Verify all values
-        assert_eq!(row.get_date(0).get_inner(), 19651);
-        assert_eq!(row.get_time(1).get_inner(), 34200000);
-        assert_eq!(row.get_timestamp_ntz(2, 3).get_millisecond(), 
1698235273182);
+        assert_eq!(row.get_date(0).unwrap().get_inner(), 19651);
+        assert_eq!(row.get_time(1).unwrap().get_inner(), 34200000);
+        assert_eq!(
+            row.get_timestamp_ntz(2, 3).unwrap().get_millisecond(),
+            1698235273182
+        );
         assert_eq!(
-            row.get_timestamp_ltz(3, 3).get_epoch_millisecond(),
+            row.get_timestamp_ltz(3, 3).unwrap().get_epoch_millisecond(),
             1698235273182
         );
-        let read_ts_ntz = row.get_timestamp_ntz(4, 6);
+        let read_ts_ntz = row.get_timestamp_ntz(4, 6).unwrap();
         assert_eq!(read_ts_ntz.get_millisecond(), 1698235273182);
         assert_eq!(read_ts_ntz.get_nano_of_millisecond(), 123456);
-        let read_ts_ltz = row.get_timestamp_ltz(5, 9);
+        let read_ts_ltz = row.get_timestamp_ltz(5, 9).unwrap();
         assert_eq!(read_ts_ltz.get_epoch_millisecond(), 1698235273182);
         assert_eq!(read_ts_ltz.get_nano_of_millisecond(), 987654);
         // Assert on Decimal equality
-        assert_eq!(row.get_decimal(6, 10, 2), small_decimal);
-        assert_eq!(row.get_decimal(7, 28, 10), large_decimal);
+        assert_eq!(row.get_decimal(6, 10, 2).unwrap(), small_decimal);
+        assert_eq!(row.get_decimal(7, 28, 10).unwrap(), large_decimal);
 
         // Assert on Decimal components to catch any regressions
-        let read_small_decimal = row.get_decimal(6, 10, 2);
+        let read_small_decimal = row.get_decimal(6, 10, 2).unwrap();
         assert_eq!(read_small_decimal.precision(), 10);
         assert_eq!(read_small_decimal.scale(), 2);
         assert_eq!(read_small_decimal.to_unscaled_long().unwrap(), 12345);
 
-        let read_large_decimal = row.get_decimal(7, 28, 10);
+        let read_large_decimal = row.get_decimal(7, 28, 10).unwrap();
         assert_eq!(read_large_decimal.precision(), 28);
         assert_eq!(read_large_decimal.scale(), 10);
         assert_eq!(
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs 
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
index 563c1c9..d201450 100644
--- a/crates/fluss/src/row/encode/compacted_key_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -83,7 +83,7 @@ impl KeyEncoder for CompactedKeyEncoder {
 
         // iterate all the fields of the row, and encode each field
         for (pos, field_getter) in self.field_getters.iter().enumerate() {
-            match &field_getter.get_field(row) {
+            match &field_getter.get_field(row)? {
                 Datum::Null => {
                     return Err(IllegalArgument {
                         message: format!("Cannot encode key with null value at 
position: {pos:?}"),
diff --git a/crates/fluss/src/row/field_getter.rs 
b/crates/fluss/src/row/field_getter.rs
index cbffa4d..d6b9fc9 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::error::Result;
 use crate::metadata::{DataType, RowType};
 use crate::row::{Datum, InternalRow};
 
@@ -24,11 +25,11 @@ pub enum FieldGetter {
     NonNullable(InnerFieldGetter),
 }
 impl FieldGetter {
-    pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
+    pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Result<Datum<'a>> 
{
         match self {
             FieldGetter::Nullable(getter) => {
-                if row.is_null_at(getter.pos()) {
-                    Datum::Null
+                if row.is_null_at(getter.pos())? {
+                    Ok(Datum::Null)
                 } else {
                     getter.get_field(row)
                 }
@@ -151,33 +152,33 @@ pub enum InnerFieldGetter {
 }
 
 impl InnerFieldGetter {
-    pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
-        match self {
-            InnerFieldGetter::Char { pos, len } => 
Datum::from(row.get_char(*pos, *len)),
-            InnerFieldGetter::String { pos } => 
Datum::from(row.get_string(*pos)),
-            InnerFieldGetter::Bool { pos } => 
Datum::from(row.get_boolean(*pos)),
-            InnerFieldGetter::Binary { pos, len } => 
Datum::from(row.get_binary(*pos, *len)),
-            InnerFieldGetter::Bytes { pos } => 
Datum::from(row.get_bytes(*pos)),
-            InnerFieldGetter::TinyInt { pos } => 
Datum::from(row.get_byte(*pos)),
-            InnerFieldGetter::SmallInt { pos } => 
Datum::from(row.get_short(*pos)),
-            InnerFieldGetter::Int { pos } => Datum::from(row.get_int(*pos)),
-            InnerFieldGetter::BigInt { pos } => 
Datum::from(row.get_long(*pos)),
-            InnerFieldGetter::Float { pos } => 
Datum::from(row.get_float(*pos)),
-            InnerFieldGetter::Double { pos } => 
Datum::from(row.get_double(*pos)),
+    pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Result<Datum<'a>> 
{
+        Ok(match self {
+            InnerFieldGetter::Char { pos, len } => 
Datum::from(row.get_char(*pos, *len)?),
+            InnerFieldGetter::String { pos } => 
Datum::from(row.get_string(*pos)?),
+            InnerFieldGetter::Bool { pos } => 
Datum::from(row.get_boolean(*pos)?),
+            InnerFieldGetter::Binary { pos, len } => 
Datum::from(row.get_binary(*pos, *len)?),
+            InnerFieldGetter::Bytes { pos } => 
Datum::from(row.get_bytes(*pos)?),
+            InnerFieldGetter::TinyInt { pos } => 
Datum::from(row.get_byte(*pos)?),
+            InnerFieldGetter::SmallInt { pos } => 
Datum::from(row.get_short(*pos)?),
+            InnerFieldGetter::Int { pos } => Datum::from(row.get_int(*pos)?),
+            InnerFieldGetter::BigInt { pos } => 
Datum::from(row.get_long(*pos)?),
+            InnerFieldGetter::Float { pos } => 
Datum::from(row.get_float(*pos)?),
+            InnerFieldGetter::Double { pos } => 
Datum::from(row.get_double(*pos)?),
             InnerFieldGetter::Decimal {
                 pos,
                 precision,
                 scale,
-            } => Datum::Decimal(row.get_decimal(*pos, *precision, *scale)),
-            InnerFieldGetter::Date { pos } => Datum::Date(row.get_date(*pos)),
-            InnerFieldGetter::Time { pos } => Datum::Time(row.get_time(*pos)),
+            } => Datum::Decimal(row.get_decimal(*pos, *precision, *scale)?),
+            InnerFieldGetter::Date { pos } => Datum::Date(row.get_date(*pos)?),
+            InnerFieldGetter::Time { pos } => Datum::Time(row.get_time(*pos)?),
             InnerFieldGetter::Timestamp { pos, precision } => {
-                Datum::TimestampNtz(row.get_timestamp_ntz(*pos, *precision))
+                Datum::TimestampNtz(row.get_timestamp_ntz(*pos, *precision)?)
             }
             InnerFieldGetter::TimestampLtz { pos, precision } => {
-                Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision))
+                Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?)
             } //TODO Array, Map, Row
-        }
+        })
     }
 
     pub fn pos(&self) -> usize {
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 276dcca..8fb777d 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -54,67 +54,69 @@ impl<'a> BinaryRow<'a> {
     }
 }
 
-// TODO make functions return Result<?> for better error handling
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+
 pub trait InternalRow: Send + Sync {
     /// Returns the number of fields in this row
     fn get_field_count(&self) -> usize;
 
     /// Returns true if the element is null at the given position
-    fn is_null_at(&self, pos: usize) -> bool;
+    fn is_null_at(&self, pos: usize) -> Result<bool>;
 
     /// Returns the boolean value at the given position
-    fn get_boolean(&self, pos: usize) -> bool;
+    fn get_boolean(&self, pos: usize) -> Result<bool>;
 
     /// Returns the byte value at the given position
-    fn get_byte(&self, pos: usize) -> i8;
+    fn get_byte(&self, pos: usize) -> Result<i8>;
 
     /// Returns the short value at the given position
-    fn get_short(&self, pos: usize) -> i16;
+    fn get_short(&self, pos: usize) -> Result<i16>;
 
     /// Returns the integer value at the given position
-    fn get_int(&self, pos: usize) -> i32;
+    fn get_int(&self, pos: usize) -> Result<i32>;
 
     /// Returns the long value at the given position
-    fn get_long(&self, pos: usize) -> i64;
+    fn get_long(&self, pos: usize) -> Result<i64>;
 
     /// Returns the float value at the given position
-    fn get_float(&self, pos: usize) -> f32;
+    fn get_float(&self, pos: usize) -> Result<f32>;
 
     /// Returns the double value at the given position
-    fn get_double(&self, pos: usize) -> f64;
+    fn get_double(&self, pos: usize) -> Result<f64>;
 
     /// Returns the string value at the given position with fixed length
-    fn get_char(&self, pos: usize, length: usize) -> &str;
+    fn get_char(&self, pos: usize, length: usize) -> Result<&str>;
 
     /// Returns the string value at the given position
-    fn get_string(&self, pos: usize) -> &str;
+    fn get_string(&self, pos: usize) -> Result<&str>;
 
     /// Returns the decimal value at the given position
-    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
Decimal;
+    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
Result<Decimal>;
 
     /// Returns the date value at the given position (date as days since epoch)
-    fn get_date(&self, pos: usize) -> datum::Date;
+    fn get_date(&self, pos: usize) -> Result<Date>;
 
     /// Returns the time value at the given position (time as milliseconds 
since midnight)
-    fn get_time(&self, pos: usize) -> datum::Time;
+    fn get_time(&self, pos: usize) -> Result<Time>;
 
     /// Returns the timestamp value at the given position (timestamp without 
timezone)
     ///
     /// The precision is required to determine whether the timestamp value was 
stored
     /// in a compact representation (precision <= 3) or with nanosecond 
precision.
-    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
datum::TimestampNtz;
+    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
Result<TimestampNtz>;
 
     /// Returns the timestamp value at the given position (timestamp with 
local timezone)
     ///
     /// The precision is required to determine whether the timestamp value was 
stored
     /// in a compact representation (precision <= 3) or with nanosecond 
precision.
-    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
datum::TimestampLtz;
+    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
Result<TimestampLtz>;
 
     /// Returns the binary value at the given position with fixed length
-    fn get_binary(&self, pos: usize, length: usize) -> &[u8];
+    fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]>;
 
     /// Returns the binary value at the given position
-    fn get_bytes(&self, pos: usize) -> &[u8];
+    fn get_bytes(&self, pos: usize) -> Result<&[u8]>;
 
     /// Returns encoded bytes if already encoded
     fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> {
@@ -127,98 +129,149 @@ pub struct GenericRow<'a> {
     pub values: Vec<Datum<'a>>,
 }
 
+impl<'a> GenericRow<'a> {
+    fn get_value(&self, pos: usize) -> Result<&Datum<'a>> {
+        self.values.get(pos).ok_or_else(|| IllegalArgument {
+            message: format!(
+                "position {pos} out of bounds (row has {} fields)",
+                self.values.len()
+            ),
+        })
+    }
+
+    fn try_convert<T: TryFrom<&'a Datum<'a>>>(
+        &'a self,
+        pos: usize,
+        expected_type: &str,
+    ) -> Result<T> {
+        let datum = self.get_value(pos)?;
+        T::try_from(datum).map_err(|_| IllegalArgument {
+            message: format!(
+                "type mismatch at position {pos}: expected {expected_type}, 
got {datum:?}"
+            ),
+        })
+    }
+}
+
 impl<'a> InternalRow for GenericRow<'a> {
     fn get_field_count(&self) -> usize {
         self.values.len()
     }
 
-    fn is_null_at(&self, pos: usize) -> bool {
-        self.values
-            .get(pos)
-            .expect("position out of bounds")
-            .is_null()
+    fn is_null_at(&self, pos: usize) -> Result<bool> {
+        Ok(self.get_value(pos)?.is_null())
     }
 
-    fn get_boolean(&self, pos: usize) -> bool {
-        self.values.get(pos).unwrap().try_into().unwrap()
+    fn get_boolean(&self, pos: usize) -> Result<bool> {
+        self.try_convert(pos, "Boolean")
     }
 
-    fn get_byte(&self, pos: usize) -> i8 {
-        self.values.get(pos).unwrap().try_into().unwrap()
+    fn get_byte(&self, pos: usize) -> Result<i8> {
+        self.try_convert(pos, "TinyInt")
     }
 
-    fn get_short(&self, pos: usize) -> i16 {
-        self.values.get(pos).unwrap().try_into().unwrap()
+    fn get_short(&self, pos: usize) -> Result<i16> {
+        self.try_convert(pos, "SmallInt")
     }
 
-    fn get_int(&self, pos: usize) -> i32 {
-        self.values.get(pos).unwrap().try_into().unwrap()
+    fn get_int(&self, pos: usize) -> Result<i32> {
+        self.try_convert(pos, "Int")
     }
 
-    fn get_long(&self, _pos: usize) -> i64 {
-        self.values.get(_pos).unwrap().try_into().unwrap()
+    fn get_long(&self, pos: usize) -> Result<i64> {
+        self.try_convert(pos, "BigInt")
     }
 
-    fn get_float(&self, pos: usize) -> f32 {
-        self.values.get(pos).unwrap().try_into().unwrap()
+    fn get_float(&self, pos: usize) -> Result<f32> {
+        self.try_convert(pos, "Float")
     }
 
-    fn get_double(&self, pos: usize) -> f64 {
-        self.values.get(pos).unwrap().try_into().unwrap()
+    fn get_double(&self, pos: usize) -> Result<f64> {
+        self.try_convert(pos, "Double")
     }
 
-    fn get_char(&self, pos: usize, _length: usize) -> &str {
+    fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
         // don't check length, following java client
         self.get_string(pos)
     }
 
-    fn get_string(&self, pos: usize) -> &str {
-        self.values.get(pos).unwrap().try_into().unwrap()
+    fn get_string(&self, pos: usize) -> Result<&str> {
+        self.try_convert(pos, "String")
     }
 
-    fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) -> 
Decimal {
-        match self.values.get(pos).unwrap() {
-            Datum::Decimal(d) => d.clone(),
-            other => panic!("Expected Decimal at pos {pos:?}, got {other:?}"),
+    fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) -> 
Result<Decimal> {
+        match self.get_value(pos)? {
+            Datum::Decimal(d) => Ok(d.clone()),
+            other => Err(IllegalArgument {
+                message: format!(
+                    "type mismatch at position {pos}: expected Decimal, got 
{other:?}"
+                ),
+            }),
         }
     }
 
-    fn get_date(&self, pos: usize) -> datum::Date {
-        match self.values.get(pos).unwrap() {
-            Datum::Date(d) => *d,
-            Datum::Int32(i) => datum::Date::new(*i),
-            other => panic!("Expected Date or Int32 at pos {pos:?}, got 
{other:?}"),
+    fn get_date(&self, pos: usize) -> Result<Date> {
+        match self.get_value(pos)? {
+            Datum::Date(d) => Ok(*d),
+            Datum::Int32(i) => Ok(Date::new(*i)),
+            other => Err(IllegalArgument {
+                message: format!(
+                    "type mismatch at position {pos}: expected Date or Int32, 
got {other:?}"
+                ),
+            }),
         }
     }
 
-    fn get_time(&self, pos: usize) -> datum::Time {
-        match self.values.get(pos).unwrap() {
-            Datum::Time(t) => *t,
-            Datum::Int32(i) => datum::Time::new(*i),
-            other => panic!("Expected Time or Int32 at pos {pos:?}, got 
{other:?}"),
+    fn get_time(&self, pos: usize) -> Result<Time> {
+        match self.get_value(pos)? {
+            Datum::Time(t) => Ok(*t),
+            Datum::Int32(i) => Ok(Time::new(*i)),
+            other => Err(IllegalArgument {
+                message: format!(
+                    "type mismatch at position {pos}: expected Time or Int32, 
got {other:?}"
+                ),
+            }),
         }
     }
 
-    fn get_timestamp_ntz(&self, pos: usize, _precision: u32) -> 
datum::TimestampNtz {
-        match self.values.get(pos).unwrap() {
-            Datum::TimestampNtz(t) => *t,
-            other => panic!("Expected TimestampNtz at pos {pos:?}, got 
{other:?}"),
+    fn get_timestamp_ntz(&self, pos: usize, _precision: u32) -> 
Result<TimestampNtz> {
+        match self.get_value(pos)? {
+            Datum::TimestampNtz(t) => Ok(*t),
+            other => Err(IllegalArgument {
+                message: format!(
+                    "type mismatch at position {pos}: expected TimestampNtz, 
got {other:?}"
+                ),
+            }),
         }
     }
 
-    fn get_timestamp_ltz(&self, pos: usize, _precision: u32) -> 
datum::TimestampLtz {
-        match self.values.get(pos).unwrap() {
-            Datum::TimestampLtz(t) => *t,
-            other => panic!("Expected TimestampLtz at pos {pos:?}, got 
{other:?}"),
+    fn get_timestamp_ltz(&self, pos: usize, _precision: u32) -> 
Result<TimestampLtz> {
+        match self.get_value(pos)? {
+            Datum::TimestampLtz(t) => Ok(*t),
+            other => Err(IllegalArgument {
+                message: format!(
+                    "type mismatch at position {pos}: expected TimestampLtz, 
got {other:?}"
+                ),
+            }),
         }
     }
 
-    fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
-        self.values.get(pos).unwrap().as_blob()
+    fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
+        match self.get_value(pos)? {
+            Datum::Blob(b) => Ok(b.as_ref()),
+            other => Err(IllegalArgument {
+                message: format!("type mismatch at position {pos}: expected 
Binary, got {other:?}"),
+            }),
+        }
     }
 
-    fn get_bytes(&self, pos: usize) -> &[u8] {
-        self.values.get(pos).unwrap().as_blob()
+    fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
+        match self.get_value(pos)? {
+            Datum::Blob(b) => Ok(b.as_ref()),
+            other => Err(IllegalArgument {
+                message: format!("type mismatch at position {pos}: expected 
Bytes, got {other:?}"),
+            }),
+        }
     }
 }
 
@@ -268,17 +321,27 @@ mod tests {
         row.set_field(0, Datum::Null);
         row.set_field(1, 42_i32);
 
-        assert!(row.is_null_at(0));
-        assert!(!row.is_null_at(1));
+        assert!(row.is_null_at(0).unwrap());
+        assert!(!row.is_null_at(1).unwrap());
+    }
+
+    #[test]
+    fn is_null_at_out_of_bounds_returns_error() {
+        let row = GenericRow::from_data(vec![42_i32]);
+        let err = row.is_null_at(5).unwrap_err();
+        assert!(
+            err.to_string().contains("out of bounds"),
+            "Expected out of bounds error, got: {err}"
+        );
     }
 
     #[test]
     fn new_initializes_nulls() {
         let row = GenericRow::new(3);
         assert_eq!(row.get_field_count(), 3);
-        assert!(row.is_null_at(0));
-        assert!(row.is_null_at(1));
-        assert!(row.is_null_at(2));
+        assert!(row.is_null_at(0).unwrap());
+        assert!(row.is_null_at(1).unwrap());
+        assert!(row.is_null_at(2).unwrap());
     }
 
     #[test]
@@ -288,8 +351,28 @@ mod tests {
         row.set_field(0, 123_i32);
         // Fields 1 and 2 remain null
         assert_eq!(row.get_field_count(), 3);
-        assert_eq!(row.get_int(0), 123);
-        assert!(row.is_null_at(1));
-        assert!(row.is_null_at(2));
+        assert_eq!(row.get_int(0).unwrap(), 123);
+        assert!(row.is_null_at(1).unwrap());
+        assert!(row.is_null_at(2).unwrap());
+    }
+
+    #[test]
+    fn type_mismatch_returns_error() {
+        let row = GenericRow::from_data(vec![Datum::Int64(999)]);
+        let err = row.get_string(0).unwrap_err();
+        assert!(
+            err.to_string().contains("type mismatch"),
+            "Expected type mismatch error, got: {err}"
+        );
+    }
+
+    #[test]
+    fn out_of_bounds_returns_error() {
+        let row = GenericRow::from_data(vec![42_i32]);
+        let err = row.get_int(5).unwrap_err();
+        assert!(
+            err.to_string().contains("out of bounds"),
+            "Expected out of bounds error, got: {err}"
+        );
     }
 }
diff --git a/crates/fluss/src/row/row_decoder.rs 
b/crates/fluss/src/row/row_decoder.rs
index 9f9b421..aea8c86 100644
--- a/crates/fluss/src/row/row_decoder.rs
+++ b/crates/fluss/src/row/row_decoder.rs
@@ -112,8 +112,8 @@ mod tests {
 
         // Verify
         assert_eq!(row.get_field_count(), 2);
-        assert_eq!(row.get_int(0), 42);
-        assert_eq!(row.get_string(1), "hello");
+        assert_eq!(row.get_int(0).unwrap(), 42);
+        assert_eq!(row.get_string(1).unwrap(), "hello");
     }
 
     #[test]
@@ -131,7 +131,7 @@ mod tests {
         let row = decoder.decode(&data);
 
         // Verify
-        assert_eq!(row.get_int(0), 100);
-        assert_eq!(row.get_string(1), "world");
+        assert_eq!(row.get_int(0).unwrap(), 100);
+        assert_eq!(row.get_string(1).unwrap(), "world");
     }
 }
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index ab5f5b6..c101a18 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -64,7 +64,7 @@ mod kv_table_test {
         let cluster = get_fluss_cluster();
         let connection = cluster.get_fluss_connection().await;
 
-        let admin = connection.get_admin().await.expect("Failed to get admin");
+        let admin = connection.get_admin().await.unwrap();
 
         let table_path = TablePath::new("fluss", "test_upsert_and_lookup");
 
@@ -83,10 +83,7 @@ mod kv_table_test {
 
         create_table(&admin, &table_path, &table_descriptor).await;
 
-        let table = connection
-            .get_table(&table_path)
-            .await
-            .expect("Failed to get table");
+        let table = connection.get_table(&table_path).await.unwrap();
 
         let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
         let upsert_writer = table_upsert
@@ -118,14 +115,11 @@ mod kv_table_test {
                 .lookup(&make_key(*id))
                 .await
                 .expect("Failed to lookup");
-            let row = result
-                .get_single_row()
-                .expect("Failed to get row")
-                .expect("Row should exist");
+            let row = result.get_single_row().unwrap().expect("Row should 
exist");
 
-            assert_eq!(row.get_int(0), *id, "id mismatch");
-            assert_eq!(row.get_string(1), *expected_name, "name mismatch");
-            assert_eq!(row.get_long(2), *expected_age, "age mismatch");
+            assert_eq!(row.get_int(0).unwrap(), *id, "id mismatch");
+            assert_eq!(row.get_string(1).unwrap(), *expected_name, "name 
mismatch");
+            assert_eq!(row.get_long(2).unwrap(), *expected_age, "age 
mismatch");
         }
 
         // Update the record with new age (await acknowledgment)
@@ -144,18 +138,15 @@ mod kv_table_test {
             .lookup(&make_key(1))
             .await
             .expect("Failed to lookup after update");
-        let found_row = result
-            .get_single_row()
-            .expect("Failed to get row")
-            .expect("Row should exist");
+        let found_row = result.get_single_row().unwrap().expect("Row should 
exist");
         assert_eq!(
-            found_row.get_long(2),
-            updated_row.get_long(2),
+            found_row.get_long(2).unwrap(),
+            updated_row.get_long(2).unwrap(),
             "Age should be updated"
         );
         assert_eq!(
-            found_row.get_string(1),
-            updated_row.get_string(1),
+            found_row.get_string(1).unwrap(),
+            updated_row.get_string(1).unwrap(),
             "Name should remain unchanged"
         );
 
@@ -174,10 +165,7 @@ mod kv_table_test {
             .await
             .expect("Failed to lookup deleted record");
         assert!(
-            result
-                .get_single_row()
-                .expect("Failed to get row")
-                .is_none(),
+            result.get_single_row().unwrap().is_none(),
             "Record 1 should not exist after delete"
         );
 
@@ -188,10 +176,7 @@ mod kv_table_test {
                 .await
                 .expect("Failed to lookup");
             assert!(
-                result
-                    .get_single_row()
-                    .expect("Failed to get row")
-                    .is_some(),
+                result.get_single_row().unwrap().is_some(),
                 "Record {} should still exist after deleting record 1",
                 i
             );
@@ -203,10 +188,7 @@ mod kv_table_test {
             .await
             .expect("Failed to lookup non-existent key");
         assert!(
-            result
-                .get_single_row()
-                .expect("Failed to get row")
-                .is_none(),
+            result.get_single_row().unwrap().is_none(),
             "Non-existent key should return None"
         );
 
@@ -221,7 +203,7 @@ mod kv_table_test {
         let cluster = get_fluss_cluster();
         let connection = cluster.get_fluss_connection().await;
 
-        let admin = connection.get_admin().await.expect("Failed to get admin");
+        let admin = connection.get_admin().await.unwrap();
 
         let table_path = TablePath::new("fluss", "test_composite_pk");
 
@@ -240,10 +222,7 @@ mod kv_table_test {
 
         create_table(&admin, &table_path, &table_descriptor).await;
 
-        let table = connection
-            .get_table(&table_path)
-            .await
-            .expect("Failed to get table");
+        let table = connection.get_table(&table_path).await.unwrap();
 
         let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
         let upsert_writer = table_upsert
@@ -279,22 +258,24 @@ mod kv_table_test {
         key.set_field(0, "US");
         key.set_field(1, 1);
         let result = lookuper.lookup(&key).await.expect("Failed to lookup");
-        let row = result
-            .get_single_row()
-            .expect("Failed to get row")
-            .expect("Row should exist");
-        assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100");
+        let row = result.get_single_row().unwrap().expect("Row should exist");
+        assert_eq!(
+            row.get_long(2).unwrap(),
+            100,
+            "Score for (US, 1) should be 100"
+        );
 
         // Lookup (EU, 2) - should return score 250
         let mut key = GenericRow::new(3);
         key.set_field(0, "EU");
         key.set_field(1, 2);
         let result = lookuper.lookup(&key).await.expect("Failed to lookup");
-        let row = result
-            .get_single_row()
-            .expect("Failed to get row")
-            .expect("Row should exist");
-        assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
+        let row = result.get_single_row().unwrap().expect("Row should exist");
+        assert_eq!(
+            row.get_long(2).unwrap(),
+            250,
+            "Score for (EU, 2) should be 250"
+        );
 
         // Update (US, 1) score (await acknowledgment)
         let mut update_row = GenericRow::new(3);
@@ -312,13 +293,10 @@ mod kv_table_test {
         key.set_field(0, "US");
         key.set_field(1, 1);
         let result = lookuper.lookup(&key).await.expect("Failed to lookup");
-        let row = result
-            .get_single_row()
-            .expect("Failed to get row")
-            .expect("Row should exist");
+        let row = result.get_single_row().unwrap().expect("Row should exist");
         assert_eq!(
-            row.get_long(2),
-            update_row.get_long(2),
+            row.get_long(2).unwrap(),
+            update_row.get_long(2).unwrap(),
             "Row score should be updated"
         );
 
@@ -393,10 +371,10 @@ mod kv_table_test {
             .expect("Failed to get row")
             .expect("Row should exist");
 
-        assert_eq!(found_row.get_int(0), 1);
-        assert_eq!(found_row.get_string(1), "Verso");
-        assert_eq!(found_row.get_long(2), 32i64);
-        assert_eq!(found_row.get_long(3), 6942i64);
+        assert_eq!(found_row.get_int(0).unwrap(), 1);
+        assert_eq!(found_row.get_string(1).unwrap(), "Verso");
+        assert_eq!(found_row.get_long(2).unwrap(), 32i64);
+        assert_eq!(found_row.get_long(3).unwrap(), 6942i64);
 
         // Create partial update writer to update only score column
         let partial_upsert = table_upsert
@@ -428,14 +406,22 @@ mod kv_table_test {
             .expect("Failed to get row")
             .expect("Row should exist");
 
-        assert_eq!(found_row.get_int(0), 1, "id should remain 1");
+        assert_eq!(found_row.get_int(0).unwrap(), 1, "id should remain 1");
         assert_eq!(
-            found_row.get_string(1),
+            found_row.get_string(1).unwrap(),
             "Verso",
             "name should remain unchanged"
         );
-        assert_eq!(found_row.get_long(2), 32, "age should remain unchanged");
-        assert_eq!(found_row.get_long(3), 420, "score should be updated to 
420");
+        assert_eq!(
+            found_row.get_long(2).unwrap(),
+            32,
+            "age should remain unchanged"
+        );
+        assert_eq!(
+            found_row.get_long(3).unwrap(),
+            420,
+            "score should be updated to 420"
+        );
 
         admin
             .drop_table(&table_path, false)
@@ -524,10 +510,10 @@ mod kv_table_test {
                 .expect("Failed to get row")
                 .expect("Row should exist");
 
-            assert_eq!(row.get_string(0), *region, "region mismatch");
-            assert_eq!(row.get_int(1), *user_id, "user_id mismatch");
-            assert_eq!(row.get_string(2), *expected_name, "name mismatch");
-            assert_eq!(row.get_long(3), *expected_score, "score mismatch");
+            assert_eq!(row.get_string(0).unwrap(), *region, "region mismatch");
+            assert_eq!(row.get_int(1).unwrap(), *user_id, "user_id mismatch");
+            assert_eq!(row.get_string(2).unwrap(), *expected_name, "name 
mismatch");
+            assert_eq!(row.get_long(3).unwrap(), *expected_score, "score 
mismatch");
         }
 
         // Test update within a partition (await acknowledgment)
@@ -551,8 +537,8 @@ mod kv_table_test {
             .get_single_row()
             .expect("Failed to get row")
             .expect("Row should exist");
-        assert_eq!(row.get_string(2), "Gustave Updated");
-        assert_eq!(row.get_long(3), 999);
+        assert_eq!(row.get_string(2).unwrap(), "Gustave Updated");
+        assert_eq!(row.get_long(3).unwrap(), 999);
 
         // Lookup in non-existent partition should return empty result
         let mut non_existent_key = GenericRow::new(4);
@@ -602,7 +588,7 @@ mod kv_table_test {
             .get_single_row()
             .expect("Failed to get row")
             .expect("Row should exist");
-        assert_eq!(row.get_string(2), "Maelle");
+        assert_eq!(row.get_string(2).unwrap(), "Maelle");
 
         admin
             .drop_table(&table_path, false)
@@ -732,62 +718,88 @@ mod kv_table_test {
             .expect("Row should exist");
 
         // Verify all datatypes
-        assert_eq!(found_row.get_int(0), pk_int, "pk_int mismatch");
+        assert_eq!(found_row.get_int(0).unwrap(), pk_int, "pk_int mismatch");
         assert_eq!(
-            found_row.get_boolean(1),
+            found_row.get_boolean(1).unwrap(),
             col_boolean,
             "col_boolean mismatch"
         );
-        assert_eq!(found_row.get_byte(2), col_tinyint, "col_tinyint mismatch");
         assert_eq!(
-            found_row.get_short(3),
+            found_row.get_byte(2).unwrap(),
+            col_tinyint,
+            "col_tinyint mismatch"
+        );
+        assert_eq!(
+            found_row.get_short(3).unwrap(),
             col_smallint,
             "col_smallint mismatch"
         );
-        assert_eq!(found_row.get_int(4), col_int, "col_int mismatch");
-        assert_eq!(found_row.get_long(5), col_bigint, "col_bigint mismatch");
+        assert_eq!(found_row.get_int(4).unwrap(), col_int, "col_int mismatch");
+        assert_eq!(
+            found_row.get_long(5).unwrap(),
+            col_bigint,
+            "col_bigint mismatch"
+        );
         assert!(
-            (found_row.get_float(6) - col_float).abs() < f32::EPSILON,
+            (found_row.get_float(6).unwrap() - col_float).abs() < f32::EPSILON,
             "col_float mismatch: expected {}, got {}",
             col_float,
-            found_row.get_float(6)
+            found_row.get_float(6).unwrap()
         );
         assert!(
-            (found_row.get_double(7) - col_double).abs() < f64::EPSILON,
+            (found_row.get_double(7).unwrap() - col_double).abs() < 
f64::EPSILON,
             "col_double mismatch: expected {}, got {}",
             col_double,
-            found_row.get_double(7)
+            found_row.get_double(7).unwrap()
+        );
+        assert_eq!(
+            found_row.get_char(8, 10).unwrap(),
+            col_char,
+            "col_char mismatch"
+        );
+        assert_eq!(
+            found_row.get_string(9).unwrap(),
+            col_string,
+            "col_string mismatch"
         );
-        assert_eq!(found_row.get_char(8, 10), col_char, "col_char mismatch");
-        assert_eq!(found_row.get_string(9), col_string, "col_string mismatch");
         assert_eq!(
-            found_row.get_decimal(10, 10, 2),
+            found_row.get_decimal(10, 10, 2).unwrap(),
             col_decimal,
             "col_decimal mismatch"
         );
         assert_eq!(
-            found_row.get_date(11).get_inner(),
+            found_row.get_date(11).unwrap().get_inner(),
             col_date.get_inner(),
             "col_date mismatch"
         );
         assert_eq!(
-            found_row.get_time(12).get_inner(),
+            found_row.get_time(12).unwrap().get_inner(),
             col_time.get_inner(),
             "col_time mismatch"
         );
         assert_eq!(
-            found_row.get_timestamp_ntz(13, 6).get_millisecond(),
+            found_row
+                .get_timestamp_ntz(13, 6)
+                .unwrap()
+                .get_millisecond(),
             col_timestamp.get_millisecond(),
             "col_timestamp mismatch"
         );
         assert_eq!(
-            found_row.get_timestamp_ltz(14, 6).get_epoch_millisecond(),
+            found_row
+                .get_timestamp_ltz(14, 6)
+                .unwrap()
+                .get_epoch_millisecond(),
             col_timestamp_ltz.get_epoch_millisecond(),
             "col_timestamp_ltz mismatch"
         );
-        assert_eq!(found_row.get_bytes(15), col_bytes, "col_bytes mismatch");
         assert_eq!(
-            found_row.get_binary(16, 20),
+            found_row.get_bytes(15).unwrap(),
+            col_bytes,
+            "col_bytes mismatch"
+        );
+        assert_eq!(
+            found_row.get_binary(16, 20).unwrap(),
             col_binary,
             "col_binary mismatch"
         );
@@ -830,29 +842,75 @@ mod kv_table_test {
             .expect("Row should exist");
 
         // Verify all nullable columns are null
-        assert_eq!(found_row_nulls.get_int(0), pk_int_2, "pk_int mismatch");
-        assert!(found_row_nulls.is_null_at(1), "col_boolean should be null");
-        assert!(found_row_nulls.is_null_at(2), "col_tinyint should be null");
-        assert!(found_row_nulls.is_null_at(3), "col_smallint should be null");
-        assert!(found_row_nulls.is_null_at(4), "col_int should be null");
-        assert!(found_row_nulls.is_null_at(5), "col_bigint should be null");
-        assert!(found_row_nulls.is_null_at(6), "col_float should be null");
-        assert!(found_row_nulls.is_null_at(7), "col_double should be null");
-        assert!(found_row_nulls.is_null_at(8), "col_char should be null");
-        assert!(found_row_nulls.is_null_at(9), "col_string should be null");
-        assert!(found_row_nulls.is_null_at(10), "col_decimal should be null");
-        assert!(found_row_nulls.is_null_at(11), "col_date should be null");
-        assert!(found_row_nulls.is_null_at(12), "col_time should be null");
+        assert_eq!(
+            found_row_nulls.get_int(0).unwrap(),
+            pk_int_2,
+            "pk_int mismatch"
+        );
+        assert!(
+            found_row_nulls.is_null_at(1).unwrap(),
+            "col_boolean should be null"
+        );
         assert!(
-            found_row_nulls.is_null_at(13),
+            found_row_nulls.is_null_at(2).unwrap(),
+            "col_tinyint should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(3).unwrap(),
+            "col_smallint should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(4).unwrap(),
+            "col_int should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(5).unwrap(),
+            "col_bigint should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(6).unwrap(),
+            "col_float should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(7).unwrap(),
+            "col_double should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(8).unwrap(),
+            "col_char should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(9).unwrap(),
+            "col_string should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(10).unwrap(),
+            "col_decimal should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(11).unwrap(),
+            "col_date should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(12).unwrap(),
+            "col_time should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(13).unwrap(),
             "col_timestamp should be null"
         );
         assert!(
-            found_row_nulls.is_null_at(14),
+            found_row_nulls.is_null_at(14).unwrap(),
             "col_timestamp_ltz should be null"
         );
-        assert!(found_row_nulls.is_null_at(15), "col_bytes should be null");
-        assert!(found_row_nulls.is_null_at(16), "col_binary should be null");
+        assert!(
+            found_row_nulls.is_null_at(15).unwrap(),
+            "col_bytes should be null"
+        );
+        assert!(
+            found_row_nulls.is_null_at(16).unwrap(),
+            "col_binary should be null"
+        );
 
         admin
             .drop_table(&table_path, false)
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index eac72e5..779ffdd 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -138,7 +138,10 @@ mod table_test {
                 .expect("Failed to poll records");
             for rec in scan_records {
                 let row = rec.row();
-                collected.push((row.get_int(0), 
row.get_string(1).to_string()));
+                collected.push((
+                    row.get_int(0).unwrap(),
+                    row.get_string(1).unwrap().to_string(),
+                ));
             }
         }
 
@@ -362,13 +365,13 @@ mod table_test {
             let row = record.row();
             // col_b is now at index 0, col_c is at index 1
             assert_eq!(
-                row.get_string(0),
+                row.get_string(0).unwrap(),
                 expected_col_b[i],
                 "col_b mismatch at index {}",
                 i
             );
             assert_eq!(
-                row.get_int(1),
+                row.get_int(1).unwrap(),
                 expected_col_c[i],
                 "col_c mismatch at index {}",
                 i
@@ -394,13 +397,13 @@ mod table_test {
             let row = record.row();
             // col_b is now at index 0, col_c is at index 1
             assert_eq!(
-                row.get_string(0),
+                row.get_string(0).unwrap(),
                 expected_col_b[i],
                 "col_b mismatch at index {}",
                 i
             );
             assert_eq!(
-                row.get_int(1),
+                row.get_int(1).unwrap(),
                 expected_col_a[i],
                 "col_c mismatch at index {}",
                 i
@@ -777,81 +780,103 @@ mod table_test {
         assert_eq!(records.len(), 2, "Expected 2 records");
 
         let found_row = records[0].row();
-        assert_eq!(found_row.get_byte(0), col_tinyint, "col_tinyint mismatch");
         assert_eq!(
-            found_row.get_short(1),
+            found_row.get_byte(0).unwrap(),
+            col_tinyint,
+            "col_tinyint mismatch"
+        );
+        assert_eq!(
+            found_row.get_short(1).unwrap(),
             col_smallint,
             "col_smallint mismatch"
         );
-        assert_eq!(found_row.get_int(2), col_int, "col_int mismatch");
-        assert_eq!(found_row.get_long(3), col_bigint, "col_bigint mismatch");
+        assert_eq!(found_row.get_int(2).unwrap(), col_int, "col_int mismatch");
+        assert_eq!(
+            found_row.get_long(3).unwrap(),
+            col_bigint,
+            "col_bigint mismatch"
+        );
         assert!(
-            (found_row.get_float(4) - col_float).abs() < f32::EPSILON,
+            (found_row.get_float(4).unwrap() - col_float).abs() < f32::EPSILON,
             "col_float mismatch: expected {}, got {}",
             col_float,
-            found_row.get_float(4)
+            found_row.get_float(4).unwrap()
         );
         assert!(
-            (found_row.get_double(5) - col_double).abs() < f64::EPSILON,
+            (found_row.get_double(5).unwrap() - col_double).abs() < 
f64::EPSILON,
             "col_double mismatch: expected {}, got {}",
             col_double,
-            found_row.get_double(5)
+            found_row.get_double(5).unwrap()
         );
         assert_eq!(
-            found_row.get_boolean(6),
+            found_row.get_boolean(6).unwrap(),
             col_boolean,
             "col_boolean mismatch"
         );
-        assert_eq!(found_row.get_char(7, 10), col_char, "col_char mismatch");
-        assert_eq!(found_row.get_string(8), col_string, "col_string mismatch");
         assert_eq!(
-            found_row.get_decimal(9, 10, 2),
+            found_row.get_char(7, 10).unwrap(),
+            col_char,
+            "col_char mismatch"
+        );
+        assert_eq!(
+            found_row.get_string(8).unwrap(),
+            col_string,
+            "col_string mismatch"
+        );
+        assert_eq!(
+            found_row.get_decimal(9, 10, 2).unwrap(),
             col_decimal,
             "col_decimal mismatch"
         );
         assert_eq!(
-            found_row.get_date(10).get_inner(),
+            found_row.get_date(10).unwrap().get_inner(),
             col_date.get_inner(),
             "col_date mismatch"
         );
 
         assert_eq!(
-            found_row.get_time(11).get_inner(),
+            found_row.get_time(11).unwrap().get_inner(),
             col_time_s.get_inner(),
             "col_time_s mismatch"
         );
 
         assert_eq!(
-            found_row.get_time(12).get_inner(),
+            found_row.get_time(12).unwrap().get_inner(),
             col_time_ms.get_inner(),
             "col_time_ms mismatch"
         );
 
         assert_eq!(
-            found_row.get_time(13).get_inner(),
+            found_row.get_time(13).unwrap().get_inner(),
             col_time_us.get_inner(),
             "col_time_us mismatch"
         );
 
         assert_eq!(
-            found_row.get_time(14).get_inner(),
+            found_row.get_time(14).unwrap().get_inner(),
             col_time_ns.get_inner(),
             "col_time_ns mismatch"
         );
 
         assert_eq!(
-            found_row.get_timestamp_ntz(15, 0).get_millisecond(),
+            found_row
+                .get_timestamp_ntz(15, 0)
+                .unwrap()
+                .get_millisecond(),
             col_timestamp_s.get_millisecond(),
             "col_timestamp_s mismatch"
         );
 
         assert_eq!(
-            found_row.get_timestamp_ntz(16, 3).get_millisecond(),
+            found_row
+                .get_timestamp_ntz(16, 3)
+                .unwrap()
+                .get_millisecond(),
             col_timestamp_ms.get_millisecond(),
             "col_timestamp_ms mismatch"
         );
 
-        let read_ts_us = found_row.get_timestamp_ntz(17, 6);
+        let read_ts_us = found_row.get_timestamp_ntz(17, 6).unwrap();
         assert_eq!(
             read_ts_us.get_millisecond(),
             col_timestamp_us.get_millisecond(),
@@ -863,7 +888,7 @@ mod table_test {
             "col_timestamp_us nanos mismatch"
         );
 
-        let read_ts_ns = found_row.get_timestamp_ntz(18, 9);
+        let read_ts_ns = found_row.get_timestamp_ntz(18, 9).unwrap();
         assert_eq!(
             read_ts_ns.get_millisecond(),
             col_timestamp_ns.get_millisecond(),
@@ -876,18 +901,24 @@ mod table_test {
         );
 
         assert_eq!(
-            found_row.get_timestamp_ltz(19, 0).get_epoch_millisecond(),
+            found_row
+                .get_timestamp_ltz(19, 0)
+                .unwrap()
+                .get_epoch_millisecond(),
             col_timestamp_ltz_s.get_epoch_millisecond(),
             "col_timestamp_ltz_s mismatch"
         );
 
         assert_eq!(
-            found_row.get_timestamp_ltz(20, 3).get_epoch_millisecond(),
+            found_row
+                .get_timestamp_ltz(20, 3)
+                .unwrap()
+                .get_epoch_millisecond(),
             col_timestamp_ltz_ms.get_epoch_millisecond(),
             "col_timestamp_ltz_ms mismatch"
         );
 
-        let read_ts_ltz_us = found_row.get_timestamp_ltz(21, 6);
+        let read_ts_ltz_us = found_row.get_timestamp_ltz(21, 6).unwrap();
         assert_eq!(
             read_ts_ltz_us.get_epoch_millisecond(),
             col_timestamp_ltz_us.get_epoch_millisecond(),
@@ -899,7 +930,7 @@ mod table_test {
             "col_timestamp_ltz_us nanos mismatch"
         );
 
-        let read_ts_ltz_ns = found_row.get_timestamp_ltz(22, 9);
+        let read_ts_ltz_ns = found_row.get_timestamp_ltz(22, 9).unwrap();
         assert_eq!(
             read_ts_ltz_ns.get_epoch_millisecond(),
             col_timestamp_ltz_ns.get_epoch_millisecond(),
@@ -910,15 +941,19 @@ mod table_test {
             col_timestamp_ltz_ns.get_nano_of_millisecond(),
             "col_timestamp_ltz_ns nanos mismatch"
         );
-        assert_eq!(found_row.get_bytes(23), col_bytes, "col_bytes mismatch");
         assert_eq!(
-            found_row.get_binary(24, 4),
+            found_row.get_bytes(23).unwrap(),
+            col_bytes,
+            "col_bytes mismatch"
+        );
+        assert_eq!(
+            found_row.get_binary(24, 4).unwrap(),
             col_binary,
             "col_binary mismatch"
         );
 
         // Verify timestamps before Unix epoch (negative timestamps)
-        let read_ts_us_neg = found_row.get_timestamp_ntz(25, 6);
+        let read_ts_us_neg = found_row.get_timestamp_ntz(25, 6).unwrap();
         assert_eq!(
             read_ts_us_neg.get_millisecond(),
             col_timestamp_us_neg.get_millisecond(),
@@ -930,7 +965,7 @@ mod table_test {
             "col_timestamp_us_neg nanos mismatch"
         );
 
-        let read_ts_ns_neg = found_row.get_timestamp_ntz(26, 9);
+        let read_ts_ns_neg = found_row.get_timestamp_ntz(26, 9).unwrap();
         assert_eq!(
             read_ts_ns_neg.get_millisecond(),
             col_timestamp_ns_neg.get_millisecond(),
@@ -942,7 +977,7 @@ mod table_test {
             "col_timestamp_ns_neg nanos mismatch"
         );
 
-        let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(27, 6);
+        let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(27, 6).unwrap();
         assert_eq!(
             read_ts_ltz_us_neg.get_epoch_millisecond(),
             col_timestamp_ltz_us_neg.get_epoch_millisecond(),
@@ -954,7 +989,7 @@ mod table_test {
             "col_timestamp_ltz_us_neg nanos mismatch"
         );
 
-        let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(28, 9);
+        let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(28, 9).unwrap();
         assert_eq!(
             read_ts_ltz_ns_neg.get_epoch_millisecond(),
             col_timestamp_ltz_ns_neg.get_epoch_millisecond(),
@@ -969,7 +1004,11 @@ mod table_test {
         // Verify row with all nulls (record index 1)
         let found_row_nulls = records[1].row();
         for i in 0..field_count {
-            assert!(found_row_nulls.is_null_at(i), "column {} should be null", 
i);
+            assert!(
+                found_row_nulls.is_null_at(i).unwrap(),
+                "column {} should be null",
+                i
+            );
         }
 
         admin
@@ -1140,9 +1179,9 @@ mod table_test {
             for rec in records {
                 let row = rec.row();
                 collected_records.push((
-                    row.get_int(0),
-                    row.get_string(1).to_string(),
-                    row.get_long(2),
+                    row.get_int(0).unwrap(),
+                    row.get_string(1).unwrap().to_string(),
+                    row.get_long(2).unwrap(),
                 ));
             }
         }
@@ -1196,9 +1235,9 @@ mod table_test {
             for rec in records {
                 let row = rec.row();
                 records_after_unsubscribe.push((
-                    row.get_int(0),
-                    row.get_string(1).to_string(),
-                    row.get_long(2),
+                    row.get_int(0).unwrap(),
+                    row.get_string(1).unwrap().to_string(),
+                    row.get_long(2).unwrap(),
                 ));
             }
         }
@@ -1248,9 +1287,9 @@ mod table_test {
             for rec in records {
                 let row = rec.row();
                 batch_collected.push((
-                    row.get_int(0),
-                    row.get_string(1).to_string(),
-                    row.get_long(2),
+                    row.get_int(0).unwrap(),
+                    row.get_string(1).unwrap().to_string(),
+                    row.get_long(2).unwrap(),
                 ));
             }
         }
@@ -1272,4 +1311,73 @@ mod table_test {
             .await
             .expect("Failed to drop table");
     }
+
+    #[tokio::test]
+    async fn undersized_row_returns_error() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss", "test_log_undersized_row");
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("col_bool", DataTypes::boolean())
+                    .column("col_int", DataTypes::int())
+                    .column("col_string", DataTypes::string())
+                    .column("col_bigint", DataTypes::bigint())
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let append_writer = table
+            .new_append()
+            .expect("Failed to create table append")
+            .create_writer()
+            .expect("Failed to create writer");
+
+        // Scenario 1b: GenericRow with only 2 fields for a 4-column table
+        let mut row = fluss::row::GenericRow::new(2);
+        row.set_field(0, true);
+        row.set_field(1, 42_i32);
+
+        let result = append_writer.append(&row);
+        assert!(result.is_err(), "Undersized row should be rejected");
+        let err_msg = result.unwrap_err().to_string();
+        assert!(
+            err_msg.contains("Expected: 4") && err_msg.contains("Actual: 2"),
+            "Error should mention field count mismatch, got: {err_msg}"
+        );
+
+        // Correct column count but wrong types:
+        // Schema is (Boolean, Int, String, BigInt) but we put Int64 where 
String is expected.
+        // This should return an error, not panic.
+        let row_wrong_types = fluss::row::GenericRow::from_data(vec![
+            fluss::row::Datum::Bool(true),
+            fluss::row::Datum::Int32(42),
+            fluss::row::Datum::Int64(999), // wrong: String column
+            fluss::row::Datum::Int64(100),
+        ]);
+
+        let result = append_writer.append(&row_wrong_types);
+        assert!(
+            result.is_err(),
+            "Row with mismatched types should be rejected, not panic"
+        );
+
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+    }
 }
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index 210dfc4..fcd6773 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -192,8 +192,18 @@ mod table_remote_scan_test {
             let row = record.row();
             let expected_c1 = i as i32;
             let expected_c2 = format!("v{}", i);
-            assert_eq!(row.get_int(1), expected_c1, "c1 mismatch at index {}", 
i);
-            assert_eq!(row.get_string(0), expected_c2, "c2 mismatch at index 
{}", i);
+            assert_eq!(
+                row.get_int(1).unwrap(),
+                expected_c1,
+                "c1 mismatch at index {}",
+                i
+            );
+            assert_eq!(
+                row.get_string(0).unwrap(),
+                expected_c2,
+                "c2 mismatch at index {}",
+                i
+            );
         }
     }
 

Reply via email to