This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c7ad66d chore: Fix panic / crashes in Rust and C++ (#365)
c7ad66d is described below
commit c7ad66d5bd5918408618db9f214a03abc82857c8
Author: Keith Lee <[email protected]>
AuthorDate: Fri Feb 27 01:58:28 2026 +0000
chore: Fix panic / crashes in Rust and C++ (#365)
---
bindings/cpp/src/lib.rs | 80 ++--
bindings/cpp/src/types.rs | 34 +-
bindings/python/src/table.rs | 56 ++-
crates/examples/src/example_kv_table.rs | 8 +-
.../examples/src/example_partitioned_kv_table.rs | 10 +-
crates/examples/src/example_table.rs | 6 +-
crates/fluss/src/client/table/append.rs | 17 +
crates/fluss/src/client/table/partition_getter.rs | 2 +-
crates/fluss/src/client/table/scanner.rs | 10 +-
crates/fluss/src/client/table/upsert.rs | 2 +-
crates/fluss/src/record/arrow.rs | 2 +-
crates/fluss/src/record/kv/kv_record_batch.rs | 2 +-
.../fluss/src/record/kv/kv_record_batch_builder.rs | 8 +-
crates/fluss/src/row/column.rs | 465 +++++++++++----------
crates/fluss/src/row/compacted/compacted_row.rs | 105 ++---
.../fluss/src/row/encode/compacted_key_encoder.rs | 2 +-
crates/fluss/src/row/field_getter.rs | 45 +-
crates/fluss/src/row/mod.rs | 231 ++++++----
crates/fluss/src/row/row_decoder.rs | 8 +-
crates/fluss/tests/integration/kv_table.rs | 266 +++++++-----
crates/fluss/tests/integration/log_table.rs | 200 +++++++--
.../fluss/tests/integration/table_remote_scan.rs | 14 +-
22 files changed, 977 insertions(+), 596 deletions(-)
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 9fbdc8f..32dbf7d 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -1784,7 +1784,7 @@ mod row_reader {
allowed: impl FnOnce(&fcore::metadata::DataType) -> bool,
) -> Result<&'a fcore::metadata::DataType, String> {
let col = get_column(columns, field)?;
- if row.is_null_at(field) {
+ if row.is_null_at(field).map_err(|e| e.to_string())? {
return Err(format!("field {field} is null"));
}
let dt = col.data_type();
@@ -1812,7 +1812,7 @@ mod row_reader {
field: usize,
) -> Result<bool, String> {
get_column(columns, field)?;
- Ok(row.is_null_at(field))
+ row.is_null_at(field).map_err(|e| e.to_string())
}
pub fn get_bool(
@@ -1823,7 +1823,7 @@ mod row_reader {
validate(row, columns, field, "get_bool", |dt| {
matches!(dt, fcore::metadata::DataType::Boolean(_))
})?;
- Ok(row.get_boolean(field))
+ row.get_boolean(field).map_err(|e| e.to_string())
}
pub fn get_i32(
@@ -1839,11 +1839,17 @@ mod row_reader {
| fcore::metadata::DataType::Int(_)
)
})?;
- Ok(match dt {
- fcore::metadata::DataType::TinyInt(_) => row.get_byte(field) as
i32,
- fcore::metadata::DataType::SmallInt(_) => row.get_short(field) as
i32,
- _ => row.get_int(field),
- })
+ match dt {
+ fcore::metadata::DataType::TinyInt(_) => row
+ .get_byte(field)
+ .map(|v| v as i32)
+ .map_err(|e| e.to_string()),
+ fcore::metadata::DataType::SmallInt(_) => row
+ .get_short(field)
+ .map(|v| v as i32)
+ .map_err(|e| e.to_string()),
+ _ => row.get_int(field).map_err(|e| e.to_string()),
+ }
}
pub fn get_i64(
@@ -1854,7 +1860,7 @@ mod row_reader {
validate(row, columns, field, "get_i64", |dt| {
matches!(dt, fcore::metadata::DataType::BigInt(_))
})?;
- Ok(row.get_long(field))
+ row.get_long(field).map_err(|e| e.to_string())
}
pub fn get_f32(
@@ -1865,7 +1871,7 @@ mod row_reader {
validate(row, columns, field, "get_f32", |dt| {
matches!(dt, fcore::metadata::DataType::Float(_))
})?;
- Ok(row.get_float(field))
+ row.get_float(field).map_err(|e| e.to_string())
}
pub fn get_f64(
@@ -1876,7 +1882,7 @@ mod row_reader {
validate(row, columns, field, "get_f64", |dt| {
matches!(dt, fcore::metadata::DataType::Double(_))
})?;
- Ok(row.get_double(field))
+ row.get_double(field).map_err(|e| e.to_string())
}
pub fn get_str<'a>(
@@ -1890,10 +1896,12 @@ mod row_reader {
fcore::metadata::DataType::Char(_) |
fcore::metadata::DataType::String(_)
)
})?;
- Ok(match dt {
- fcore::metadata::DataType::Char(ct) => row.get_char(field,
ct.length() as usize),
- _ => row.get_string(field),
- })
+ match dt {
+ fcore::metadata::DataType::Char(ct) => row
+ .get_char(field, ct.length() as usize)
+ .map_err(|e| e.to_string()),
+ _ => row.get_string(field).map_err(|e| e.to_string()),
+ }
}
pub fn get_bytes<'a>(
@@ -1907,10 +1915,12 @@ mod row_reader {
fcore::metadata::DataType::Binary(_) |
fcore::metadata::DataType::Bytes(_)
)
})?;
- Ok(match dt {
- fcore::metadata::DataType::Binary(bt) => row.get_binary(field,
bt.length()),
- _ => row.get_bytes(field),
- })
+ match dt {
+ fcore::metadata::DataType::Binary(bt) => row
+ .get_binary(field, bt.length())
+ .map_err(|e| e.to_string()),
+ _ => row.get_bytes(field).map_err(|e| e.to_string()),
+ }
}
pub fn get_date_days(
@@ -1921,7 +1931,9 @@ mod row_reader {
validate(row, columns, field, "get_date_days", |dt| {
matches!(dt, fcore::metadata::DataType::Date(_))
})?;
- Ok(row.get_date(field).get_inner())
+ row.get_date(field)
+ .map(|d| d.get_inner())
+ .map_err(|e| e.to_string())
}
pub fn get_time_millis(
@@ -1932,7 +1944,9 @@ mod row_reader {
validate(row, columns, field, "get_time_millis", |dt| {
matches!(dt, fcore::metadata::DataType::Time(_))
})?;
- Ok(row.get_time(field).get_inner())
+ row.get_time(field)
+ .map(|t| t.get_inner())
+ .map_err(|e| e.to_string())
}
pub fn get_ts_millis(
@@ -1948,12 +1962,14 @@ mod row_reader {
)
})?;
match dt {
- fcore::metadata::DataType::TimestampLTz(ts) => Ok(row
+ fcore::metadata::DataType::TimestampLTz(ts) => row
.get_timestamp_ltz(field, ts.precision())
- .get_epoch_millisecond()),
- fcore::metadata::DataType::Timestamp(ts) => Ok(row
+ .map(|v| v.get_epoch_millisecond())
+ .map_err(|e| e.to_string()),
+ fcore::metadata::DataType::Timestamp(ts) => row
.get_timestamp_ntz(field, ts.precision())
- .get_millisecond()),
+ .map(|v| v.get_millisecond())
+ .map_err(|e| e.to_string()),
dt => Err(format!("get_ts_millis: unexpected type {dt}")),
}
}
@@ -1971,12 +1987,14 @@ mod row_reader {
)
})?;
match dt {
- fcore::metadata::DataType::TimestampLTz(ts) => Ok(row
+ fcore::metadata::DataType::TimestampLTz(ts) => row
.get_timestamp_ltz(field, ts.precision())
- .get_nano_of_millisecond()),
- fcore::metadata::DataType::Timestamp(ts) => Ok(row
+ .map(|v| v.get_nano_of_millisecond())
+ .map_err(|e| e.to_string()),
+ fcore::metadata::DataType::Timestamp(ts) => row
.get_timestamp_ntz(field, ts.precision())
- .get_nano_of_millisecond()),
+ .map(|v| v.get_nano_of_millisecond())
+ .map_err(|e| e.to_string()),
dt => Err(format!("get_ts_nanos: unexpected type {dt}")),
}
}
@@ -1998,7 +2016,9 @@ mod row_reader {
})?;
match dt {
fcore::metadata::DataType::Decimal(dd) => {
- let decimal = row.get_decimal(field, dd.precision() as usize,
dd.scale() as usize);
+ let decimal = row
+ .get_decimal(field, dd.precision() as usize, dd.scale() as
usize)
+ .map_err(|e| e.to_string())?;
Ok(decimal.to_big_decimal().to_string())
}
dt => Err(format!("get_decimal_str: unexpected type {dt}")),
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 073a168..f8efe67 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -371,42 +371,42 @@ pub fn compacted_row_to_owned(
let mut out = fcore::row::GenericRow::new(columns.len());
for (i, col) in columns.iter().enumerate() {
- if row.is_null_at(i) {
+ if row.is_null_at(i)? {
out.set_field(i, Datum::Null);
continue;
}
let datum = match col.data_type() {
- fcore::metadata::DataType::Boolean(_) =>
Datum::Bool(row.get_boolean(i)),
- fcore::metadata::DataType::TinyInt(_) =>
Datum::Int8(row.get_byte(i)),
- fcore::metadata::DataType::SmallInt(_) =>
Datum::Int16(row.get_short(i)),
- fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)),
- fcore::metadata::DataType::BigInt(_) =>
Datum::Int64(row.get_long(i)),
- fcore::metadata::DataType::Float(_) =>
Datum::Float32(row.get_float(i).into()),
- fcore::metadata::DataType::Double(_) =>
Datum::Float64(row.get_double(i).into()),
+ fcore::metadata::DataType::Boolean(_) =>
Datum::Bool(row.get_boolean(i)?),
+ fcore::metadata::DataType::TinyInt(_) =>
Datum::Int8(row.get_byte(i)?),
+ fcore::metadata::DataType::SmallInt(_) =>
Datum::Int16(row.get_short(i)?),
+ fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)?),
+ fcore::metadata::DataType::BigInt(_) =>
Datum::Int64(row.get_long(i)?),
+ fcore::metadata::DataType::Float(_) =>
Datum::Float32(row.get_float(i)?.into()),
+ fcore::metadata::DataType::Double(_) =>
Datum::Float64(row.get_double(i)?.into()),
fcore::metadata::DataType::String(_) => {
- Datum::String(Cow::Owned(row.get_string(i).to_string()))
+ Datum::String(Cow::Owned(row.get_string(i)?.to_string()))
}
fcore::metadata::DataType::Bytes(_) => {
- Datum::Blob(Cow::Owned(row.get_bytes(i).to_vec()))
+ Datum::Blob(Cow::Owned(row.get_bytes(i)?.to_vec()))
}
- fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)),
- fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)),
+ fcore::metadata::DataType::Date(_) =>
Datum::Date(row.get_date(i)?),
+ fcore::metadata::DataType::Time(_) =>
Datum::Time(row.get_time(i)?),
fcore::metadata::DataType::Timestamp(dt) => {
- Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision()))
+ Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())?)
}
fcore::metadata::DataType::TimestampLTz(dt) => {
- Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision()))
+ Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())?)
}
fcore::metadata::DataType::Decimal(dt) => {
- let decimal = row.get_decimal(i, dt.precision() as usize,
dt.scale() as usize);
+ let decimal = row.get_decimal(i, dt.precision() as usize,
dt.scale() as usize)?;
Datum::Decimal(decimal)
}
fcore::metadata::DataType::Char(dt) => Datum::String(Cow::Owned(
- row.get_char(i, dt.length() as usize).to_string(),
+ row.get_char(i, dt.length() as usize)?.to_string(),
)),
fcore::metadata::DataType::Binary(dt) => {
- Datum::Blob(Cow::Owned(row.get_binary(i,
dt.length()).to_vec()))
+ Datum::Blob(Cow::Owned(row.get_binary(i,
dt.length())?.to_vec()))
}
other => return Err(anyhow!("Unsupported data type for column {i}:
{other:?}")),
};
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index bc2e956..660cd6b 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -1256,91 +1256,119 @@ pub fn datum_to_python_value(
use fcore::metadata::DataType;
// Check for null first
- if row.is_null_at(pos) {
+ if row
+ .is_null_at(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
+ {
return Ok(py.None());
}
match data_type {
DataType::Boolean(_) => Ok(row
.get_boolean(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::TinyInt(_) => Ok(row
.get_byte(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::SmallInt(_) => Ok(row
.get_short(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::Int(_) => Ok(row
.get_int(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::BigInt(_) => Ok(row
.get_long(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::Float(_) => Ok(row
.get_float(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::Double(_) => Ok(row
.get_double(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::String(_) => {
- let s = row.get_string(pos);
+ let s = row
+ .get_string(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?;
Ok(s.into_pyobject(py)?.into_any().unbind())
}
DataType::Char(char_type) => {
- let s = row.get_char(pos, char_type.length() as usize);
+ let s = row
+ .get_char(pos, char_type.length() as usize)
+ .map_err(|e| FlussError::from_core_error(&e))?;
Ok(s.into_pyobject(py)?.into_any().unbind())
}
DataType::Bytes(_) => {
- let b = row.get_bytes(pos);
+ let b = row
+ .get_bytes(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?;
Ok(PyBytes::new(py, b).into_any().unbind())
}
DataType::Binary(binary_type) => {
- let b = row.get_binary(pos, binary_type.length());
+ let b = row
+ .get_binary(pos, binary_type.length())
+ .map_err(|e| FlussError::from_core_error(&e))?;
Ok(PyBytes::new(py, b).into_any().unbind())
}
DataType::Decimal(decimal_type) => {
- let decimal = row.get_decimal(
- pos,
- decimal_type.precision() as usize,
- decimal_type.scale() as usize,
- );
+ let decimal = row
+ .get_decimal(
+ pos,
+ decimal_type.precision() as usize,
+ decimal_type.scale() as usize,
+ )
+ .map_err(|e| FlussError::from_core_error(&e))?;
rust_decimal_to_python(py, &decimal)
}
DataType::Date(_) => {
- let date = row.get_date(pos);
+ let date = row
+ .get_date(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?;
rust_date_to_python(py, date)
}
DataType::Time(_) => {
- let time = row.get_time(pos);
+ let time = row
+ .get_time(pos)
+ .map_err(|e| FlussError::from_core_error(&e))?;
rust_time_to_python(py, time)
}
DataType::Timestamp(ts_type) => {
- let ts = row.get_timestamp_ntz(pos, ts_type.precision());
+ let ts = row
+ .get_timestamp_ntz(pos, ts_type.precision())
+ .map_err(|e| FlussError::from_core_error(&e))?;
rust_timestamp_ntz_to_python(py, ts)
}
DataType::TimestampLTz(ts_type) => {
- let ts = row.get_timestamp_ltz(pos, ts_type.precision());
+ let ts = row
+ .get_timestamp_ltz(pos, ts_type.precision())
+ .map_err(|e| FlussError::from_core_error(&e))?;
rust_timestamp_ltz_to_python(py, ts)
}
_ => Err(FlussError::new_err(format!(
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 90788b1..8fb60ba 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -75,8 +75,8 @@ pub async fn main() -> Result<()> {
let row = result.get_single_row()?.unwrap();
println!(
"Found id={id}: name={}, age={}",
- row.get_string(1),
- row.get_long(2)
+ row.get_string(1)?,
+ row.get_long(2)?
);
}
@@ -92,8 +92,8 @@ pub async fn main() -> Result<()> {
let row = result.get_single_row()?.unwrap();
println!(
"Verified update: name={}, age={}",
- row.get_string(1),
- row.get_long(2)
+ row.get_string(1)?,
+ row.get_long(2)?
);
println!("\n=== Deleting ===");
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index e047178..9cd2e7d 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -90,9 +90,9 @@ pub async fn main() -> Result<()> {
let row = result.get_single_row()?.unwrap();
println!(
"Found id={id}: region={}, zone={}, score={}",
- row.get_string(1),
- row.get_long(2),
- row.get_long(3)
+ row.get_string(1)?,
+ row.get_long(2)?,
+ row.get_long(3)?
);
}
@@ -109,8 +109,8 @@ pub async fn main() -> Result<()> {
let row = result.get_single_row()?.unwrap();
println!(
"Verified update: region={}, zone={}",
- row.get_string(1),
- row.get_long(2)
+ row.get_string(1)?,
+ row.get_long(2)?
);
println!("\n=== Deleting ===");
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index cfe1627..e4ad1fb 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -83,9 +83,9 @@ pub async fn main() -> Result<()> {
let row = record.row();
println!(
"{{{}, {}, {}}}@{}",
- row.get_int(0),
- row.get_string(1),
- row.get_long(2),
+ row.get_int(0)?,
+ row.get_string(1)?,
+ row.get_long(2)?,
record.offset()
);
}
diff --git a/crates/fluss/src/client/table/append.rs
b/crates/fluss/src/client/table/append.rs
index 942253f..a58433f 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -17,6 +17,7 @@
use crate::client::table::partition_getter::{PartitionGetter,
get_physical_path};
use crate::client::{WriteRecord, WriteResultFuture, WriterClient};
+use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
use crate::row::{ColumnarRow, InternalRow};
@@ -69,6 +70,21 @@ pub struct AppendWriter {
}
impl AppendWriter {
+ fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
+ let expected = self.table_info.get_row_type().fields().len();
+ if row.get_field_count() != expected {
+ return Err(IllegalArgument {
+ message: format!(
+ "The field count of the row does not match the table
schema. \
+ Expected: {}, Actual: {}",
+ expected,
+ row.get_field_count()
+ ),
+ });
+ }
+ Ok(())
+ }
+
/// Appends a row to the table.
///
/// This method returns a [`WriteResultFuture`] immediately after queueing
the write,
@@ -81,6 +97,7 @@ impl AppendWriter {
/// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
/// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
pub fn append<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture>
{
+ self.check_field_count(row)?;
let physical_table_path = Arc::new(get_physical_path(
&self.table_path,
self.partition_getter.as_ref(),
diff --git a/crates/fluss/src/client/table/partition_getter.rs
b/crates/fluss/src/client/table/partition_getter.rs
index a1aad2d..1115ded 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -87,7 +87,7 @@ impl PartitionGetter {
let mut partition_values = Vec::with_capacity(self.partitions.len());
for (data_type, field_getter) in &self.partitions {
- let value = field_getter.get_field(row);
+ let value = field_getter.get_field(row)?;
if value.is_null() {
return Err(IllegalArgument {
message: "Partition value shouldn't be null.".to_string(),
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 4b6f809..3ec9106 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -114,9 +114,9 @@ impl<'a> TableScan<'a> {
/// let row = record.row();
/// println!(
/// "{{{}, {}, {}}}@{}",
- /// row.get_int(0),
- /// row.get_string(2),
- /// row.get_string(3),
+ /// row.get_int(0)?,
+ /// row.get_string(2)?,
+ /// row.get_string(3)?,
/// record.offset()
/// );
/// }
@@ -188,8 +188,8 @@ impl<'a> TableScan<'a> {
/// let row = record.row();
/// println!(
/// "{{{}, {}}}@{}",
- /// row.get_int(0),
- /// row.get_string(1),
+ /// row.get_int(0)?,
+ /// row.get_string(1)?,
/// record.offset()
/// );
/// }
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index 7057b90..52ec37b 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -328,7 +328,7 @@ impl UpsertWriter {
})?;
encoder.start_new_row()?;
for (pos, field_getter) in self.field_getters.iter().enumerate() {
- let datum = field_getter.get_field(row);
+ let datum = field_getter.get_field(row)?;
encoder.encode_field(pos, datum)?;
}
encoder.finish_row()
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 7fb9d34..ea27836 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -355,7 +355,7 @@ impl ArrowRecordBatchInnerBuilder for
RowAppendRecordBatchBuilder {
fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
for (idx, getter) in self.field_getters.iter().enumerate() {
- let datum = getter.get_field(row);
+ let datum = getter.get_field(row)?;
let field_type = self.table_schema.field(idx).data_type();
let builder =
self.arrow_column_builders
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs
b/crates/fluss/src/record/kv/kv_record_batch.rs
index eb89d69..14ff2e9 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -445,7 +445,7 @@ mod tests {
assert_eq!(record1.key().as_ref(), key1);
assert!(!record1.is_deletion());
let row1 = record1.row(&*decoder).unwrap();
- assert_eq!(row1.get_bytes(0), &[1, 2, 3, 4, 5]);
+ assert_eq!(row1.get_bytes(0).unwrap(), &[1, 2, 3, 4, 5]);
let record2 = iter.next().unwrap().unwrap();
assert_eq!(record2.key().as_ref(), key2);
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 8370764..0e80633 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -555,14 +555,14 @@ mod tests {
1 => {
assert_eq!(rec.key().as_ref(), key1);
let row = rec.row(&*decoder).unwrap();
- assert_eq!(row.get_int(0), 42);
- assert_eq!(row.get_string(1), "hello");
+ assert_eq!(row.get_int(0)?, 42);
+ assert_eq!(row.get_string(1)?, "hello");
}
2 => {
assert_eq!(rec.key().as_ref(), key2);
let row = rec.row(&*decoder).unwrap();
- assert_eq!(row.get_int(0), 100);
- assert_eq!(row.get_string(1), "world");
+ assert_eq!(row.get_int(0)?, 100);
+ assert_eq!(row.get_string(1)?, "world");
}
3 => {
assert_eq!(rec.key().as_ref(), key3);
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 50db32b..c07fe97 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -15,15 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
use crate::row::InternalRow;
-use arrow::array::{
- Array, AsArray, BinaryArray, Date32Array, Decimal128Array,
FixedSizeBinaryArray, Float32Array,
- Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch,
StringArray,
- Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray,
- TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray,
- TimestampSecondArray,
+use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
+use arrow::array::{Array, AsArray, BinaryArray, RecordBatch, StringArray};
+use arrow::datatypes::{
+ DataType as ArrowDataType, Date32Type, Decimal128Type, Float32Type,
Float64Type, Int8Type,
+ Int16Type, Int32Type, Int64Type, Time32MillisecondType, Time32SecondType,
+ Time64MicrosecondType, Time64NanosecondType, TimeUnit,
TimestampMicrosecondType,
+ TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
-use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use std::sync::Arc;
#[derive(Clone)]
@@ -59,6 +61,18 @@ impl ColumnarRow {
&self.record_batch
}
+ fn column(&self, pos: usize) -> Result<&Arc<dyn Array>> {
+ self.record_batch
+ .columns()
+ .get(pos)
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "column index {pos} out of bounds (batch has {} columns)",
+ self.record_batch.num_columns()
+ ),
+ })
+ }
+
/// Generic helper to read timestamp from Arrow, handling all TimeUnit
conversions.
/// Like Java, the precision parameter is ignored - conversion is
determined by Arrow TimeUnit.
fn read_timestamp_from_arrow<T>(
@@ -66,114 +80,133 @@ impl ColumnarRow {
pos: usize,
_precision: u32,
construct_compact: impl FnOnce(i64) -> T,
- construct_with_nanos: impl FnOnce(i64, i32) -> crate::error::Result<T>,
- ) -> T {
- let schema = self.record_batch.schema();
- let arrow_field = schema.field(pos);
- let column = self.record_batch.column(pos);
-
- // Read value based on the actual Arrow timestamp type
- let value = match arrow_field.data_type() {
- ArrowDataType::Timestamp(TimeUnit::Second, _) => column
- .as_any()
- .downcast_ref::<TimestampSecondArray>()
- .expect("Expected TimestampSecondArray")
- .value(self.row_id),
- ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => column
- .as_any()
- .downcast_ref::<TimestampMillisecondArray>()
- .expect("Expected TimestampMillisecondArray")
- .value(self.row_id),
- ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => column
- .as_any()
- .downcast_ref::<TimestampMicrosecondArray>()
- .expect("Expected TimestampMicrosecondArray")
- .value(self.row_id),
- ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => column
- .as_any()
- .downcast_ref::<TimestampNanosecondArray>()
- .expect("Expected TimestampNanosecondArray")
- .value(self.row_id),
- other => panic!("Expected Timestamp column at position {pos}, got
{other:?}"),
+ construct_with_nanos: impl FnOnce(i64, i32) -> Result<T>,
+ ) -> Result<T> {
+ let column = self.column(pos)?;
+
+ // Read value and time unit based on the actual Arrow timestamp type
+ let (value, time_unit) = match column.data_type() {
+ ArrowDataType::Timestamp(TimeUnit::Second, _) => (
+ column
+ .as_primitive_opt::<TimestampSecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected TimestampSecondArray at
position {pos}"),
+ })?
+ .value(self.row_id),
+ TimeUnit::Second,
+ ),
+ ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => (
+ column
+ .as_primitive_opt::<TimestampMillisecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected TimestampMillisecondArray
at position {pos}"),
+ })?
+ .value(self.row_id),
+ TimeUnit::Millisecond,
+ ),
+ ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => (
+ column
+ .as_primitive_opt::<TimestampMicrosecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected TimestampMicrosecondArray
at position {pos}"),
+ })?
+ .value(self.row_id),
+ TimeUnit::Microsecond,
+ ),
+ ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => (
+ column
+ .as_primitive_opt::<TimestampNanosecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected TimestampNanosecondArray at
position {pos}"),
+ })?
+ .value(self.row_id),
+ TimeUnit::Nanosecond,
+ ),
+ other => {
+ return Err(IllegalArgument {
+ message: format!("expected Timestamp column at position
{pos}, got {other:?}"),
+ });
+ }
};
// Convert based on Arrow TimeUnit
- let (millis, nanos) = match arrow_field.data_type() {
- ArrowDataType::Timestamp(time_unit, _) => match time_unit {
- TimeUnit::Second => (value * 1000, 0),
- TimeUnit::Millisecond => (value, 0),
- TimeUnit::Microsecond => {
- // Use Euclidean division so that nanos is always
non-negative,
- // even for timestamps before the Unix epoch.
- let millis = value.div_euclid(1000);
- let nanos = (value.rem_euclid(1000) * 1000) as i32;
- (millis, nanos)
- }
- TimeUnit::Nanosecond => {
- // Use Euclidean division so that nanos is always in [0,
999_999].
- let millis = value.div_euclid(1_000_000);
- let nanos = value.rem_euclid(1_000_000) as i32;
- (millis, nanos)
- }
- },
- _ => unreachable!(),
+ let (millis, nanos) = match time_unit {
+ TimeUnit::Second => (value * 1000, 0),
+ TimeUnit::Millisecond => (value, 0),
+ TimeUnit::Microsecond => {
+ // Use Euclidean division so that nanos is always non-negative,
+ // even for timestamps before the Unix epoch.
+ let millis = value.div_euclid(1000);
+ let nanos = (value.rem_euclid(1000) * 1000) as i32;
+ (millis, nanos)
+ }
+ TimeUnit::Nanosecond => {
+ // Use Euclidean division so that nanos is always in [0,
999_999].
+ let millis = value.div_euclid(1_000_000);
+ let nanos = value.rem_euclid(1_000_000) as i32;
+ (millis, nanos)
+ }
};
if nanos == 0 {
- construct_compact(millis)
+ Ok(construct_compact(millis))
} else {
- // nanos is guaranteed to be in valid range [0, 999_999] by
arithmetic
- construct_with_nanos(millis, nanos).expect("nanos in valid range
by construction")
+ construct_with_nanos(millis, nanos)
}
}
/// Read date value from Arrow Date32Array
- fn read_date_from_arrow(&self, pos: usize) -> i32 {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<Date32Array>()
- .expect("Expected Date32Array")
- .value(self.row_id)
+ fn read_date_from_arrow(&self, pos: usize) -> Result<i32> {
+ Ok(self
+ .column(pos)?
+ .as_primitive_opt::<Date32Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected Date32Array at position {pos}"),
+ })?
+ .value(self.row_id))
}
/// Read time value from Arrow Time32/Time64 arrays, converting to
milliseconds
- fn read_time_from_arrow(&self, pos: usize) -> i32 {
- let schema = self.record_batch.schema();
- let arrow_field = schema.field(pos);
- let column = self.record_batch.column(pos);
+ fn read_time_from_arrow(&self, pos: usize) -> Result<i32> {
+ let column = self.column(pos)?;
- match arrow_field.data_type() {
+ match column.data_type() {
ArrowDataType::Time32(TimeUnit::Second) => {
let value = column
- .as_any()
- .downcast_ref::<Time32SecondArray>()
- .expect("Expected Time32SecondArray")
+ .as_primitive_opt::<Time32SecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected Time32SecondArray at
position {pos}"),
+ })?
.value(self.row_id);
- value * 1000 // Convert seconds to milliseconds
+ Ok(value * 1000) // Convert seconds to milliseconds
}
- ArrowDataType::Time32(TimeUnit::Millisecond) => column
- .as_any()
- .downcast_ref::<Time32MillisecondArray>()
- .expect("Expected Time32MillisecondArray")
- .value(self.row_id),
+ ArrowDataType::Time32(TimeUnit::Millisecond) => Ok(column
+ .as_primitive_opt::<Time32MillisecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected Time32MillisecondArray at
position {pos}"),
+ })?
+ .value(self.row_id)),
ArrowDataType::Time64(TimeUnit::Microsecond) => {
let value = column
- .as_any()
- .downcast_ref::<Time64MicrosecondArray>()
- .expect("Expected Time64MicrosecondArray")
+ .as_primitive_opt::<Time64MicrosecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected Time64MicrosecondArray at
position {pos}"),
+ })?
.value(self.row_id);
- (value / 1000) as i32 // Convert microseconds to milliseconds
+ Ok((value / 1000) as i32) // Convert microseconds to
milliseconds
}
ArrowDataType::Time64(TimeUnit::Nanosecond) => {
let value = column
- .as_any()
- .downcast_ref::<Time64NanosecondArray>()
- .expect("Expected Time64NanosecondArray")
+ .as_primitive_opt::<Time64NanosecondType>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected Time64NanosecondArray at
position {pos}"),
+ })?
.value(self.row_id);
- (value / 1_000_000) as i32 // Convert nanoseconds to
milliseconds
+ Ok((value / 1_000_000) as i32) // Convert nanoseconds to
milliseconds
}
- other => panic!("Expected Time column at position {pos}, got
{other:?}"),
+ other => Err(IllegalArgument {
+ message: format!("expected Time column at position {pos}, got
{other:?}"),
+ }),
}
}
}
@@ -183,106 +216,121 @@ impl InternalRow for ColumnarRow {
self.record_batch.num_columns()
}
- fn is_null_at(&self, pos: usize) -> bool {
- self.record_batch.column(pos).is_null(self.row_id)
+ fn is_null_at(&self, pos: usize) -> Result<bool> {
+ Ok(self.column(pos)?.is_null(self.row_id))
}
- fn get_boolean(&self, pos: usize) -> bool {
- self.record_batch
- .column(pos)
- .as_boolean()
- .value(self.row_id)
+ fn get_boolean(&self, pos: usize) -> Result<bool> {
+ Ok(self
+ .column(pos)?
+ .as_boolean_opt()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected boolean array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_byte(&self, pos: usize) -> i8 {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<Int8Array>()
- .expect("Expect byte array")
- .value(self.row_id)
+ fn get_byte(&self, pos: usize) -> Result<i8> {
+ Ok(self
+ .column(pos)?
+ .as_primitive_opt::<Int8Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected byte array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_short(&self, pos: usize) -> i16 {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<Int16Array>()
- .expect("Expect short array")
- .value(self.row_id)
+ fn get_short(&self, pos: usize) -> Result<i16> {
+ Ok(self
+ .column(pos)?
+ .as_primitive_opt::<Int16Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected short array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_int(&self, pos: usize) -> i32 {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<Int32Array>()
- .expect("Expect int array")
- .value(self.row_id)
+ fn get_int(&self, pos: usize) -> Result<i32> {
+ Ok(self
+ .column(pos)?
+ .as_primitive_opt::<Int32Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected int array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_long(&self, pos: usize) -> i64 {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<Int64Array>()
- .expect("Expect long array")
- .value(self.row_id)
+ fn get_long(&self, pos: usize) -> Result<i64> {
+ Ok(self
+ .column(pos)?
+ .as_primitive_opt::<Int64Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected long array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_float(&self, pos: usize) -> f32 {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<Float32Array>()
- .expect("Expect float32 array")
- .value(self.row_id)
+ fn get_float(&self, pos: usize) -> Result<f32> {
+ Ok(self
+ .column(pos)?
+ .as_primitive_opt::<Float32Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected float32 array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_double(&self, pos: usize) -> f64 {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<Float64Array>()
- .expect("Expect float64 array")
- .value(self.row_id)
+ fn get_double(&self, pos: usize) -> Result<f64> {
+ Ok(self
+ .column(pos)?
+ .as_primitive_opt::<Float64Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected float64 array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_char(&self, pos: usize, _length: usize) -> &str {
- self.record_batch
- .column(pos)
+ fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
+ Ok(self
+ .column(pos)?
.as_any()
.downcast_ref::<StringArray>()
- .expect("Expected String array for char type")
- .value(self.row_id)
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected String array for char type at
position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_string(&self, pos: usize) -> &str {
- self.record_batch
- .column(pos)
+ fn get_string(&self, pos: usize) -> Result<&str> {
+ Ok(self
+ .column(pos)?
.as_any()
.downcast_ref::<StringArray>()
- .expect("Expected String array.")
- .value(self.row_id)
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected String array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
crate::row::Decimal {
+ fn get_decimal(
+ &self,
+ pos: usize,
+ precision: usize,
+ scale: usize,
+ ) -> Result<crate::row::Decimal> {
use arrow::datatypes::DataType;
- let column = self.record_batch.column(pos);
+ let column = self.column(pos)?;
let array = column
- .as_any()
- .downcast_ref::<Decimal128Array>()
- .unwrap_or_else(|| {
- panic!(
- "Expected Decimal128Array at column {}, found: {:?}",
- pos,
+ .as_primitive_opt::<Decimal128Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "expected Decimal128Array at column {pos}, found: {:?}",
column.data_type()
- )
- });
+ ),
+ })?;
// Contract: caller must check is_null_at() before calling get_decimal.
- // Calling on null value violates the contract and returns garbage data
debug_assert!(
!array.is_null(self.row_id),
"get_decimal called on null value at pos {} row {}",
@@ -290,12 +338,16 @@ impl InternalRow for ColumnarRow {
self.row_id
);
- // Read scale from Arrow schema field metadata
- let schema = self.record_batch.schema();
- let field = schema.field(pos);
- let arrow_scale = match field.data_type() {
+ // Read scale from Arrow column data type
+ let arrow_scale = match column.data_type() {
DataType::Decimal128(_p, s) => *s as i64,
- dt => panic!("Expected Decimal128 data type at column {pos},
found: {dt:?}"),
+ dt => {
+ return Err(IllegalArgument {
+ message: format!(
+ "expected Decimal128 data type at column {pos}, found:
{dt:?}"
+ ),
+ });
+ }
};
let i128_val = array.value(self.row_id);
@@ -307,60 +359,53 @@ impl InternalRow for ColumnarRow {
precision as u32,
scale as u32,
)
- .unwrap_or_else(|e| {
- panic!(
- "Failed to create Decimal at column {} row {}: {}",
- pos, self.row_id, e
- )
- })
}
- fn get_date(&self, pos: usize) -> crate::row::datum::Date {
- crate::row::datum::Date::new(self.read_date_from_arrow(pos))
+ fn get_date(&self, pos: usize) -> Result<Date> {
+ Ok(Date::new(self.read_date_from_arrow(pos)?))
}
- fn get_time(&self, pos: usize) -> crate::row::datum::Time {
- crate::row::datum::Time::new(self.read_time_from_arrow(pos))
+ fn get_time(&self, pos: usize) -> Result<Time> {
+ Ok(Time::new(self.read_time_from_arrow(pos)?))
}
- fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
crate::row::datum::TimestampNtz {
- // Like Java's ArrowTimestampNtzColumnVector, we ignore the precision
parameter
- // and determine the conversion from the Arrow column's TimeUnit.
+ fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
Result<TimestampNtz> {
self.read_timestamp_from_arrow(
pos,
precision,
- crate::row::datum::TimestampNtz::new,
- crate::row::datum::TimestampNtz::from_millis_nanos,
+ TimestampNtz::new,
+ TimestampNtz::from_millis_nanos,
)
}
- fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
crate::row::datum::TimestampLtz {
- // Like Java's ArrowTimestampLtzColumnVector, we ignore the precision
parameter
- // and determine the conversion from the Arrow column's TimeUnit.
+ fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
Result<TimestampLtz> {
self.read_timestamp_from_arrow(
pos,
precision,
- crate::row::datum::TimestampLtz::new,
- crate::row::datum::TimestampLtz::from_millis_nanos,
+ TimestampLtz::new,
+ TimestampLtz::from_millis_nanos,
)
}
- fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<FixedSizeBinaryArray>()
- .expect("Expected binary array.")
- .value(self.row_id)
+ fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
+ Ok(self
+ .column(pos)?
+ .as_fixed_size_binary_opt()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected binary array at position {pos}"),
+ })?
+ .value(self.row_id))
}
- fn get_bytes(&self, pos: usize) -> &[u8] {
- self.record_batch
- .column(pos)
+ fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
+ Ok(self
+ .column(pos)?
.as_any()
.downcast_ref::<BinaryArray>()
- .expect("Expected bytes array.")
- .value(self.row_id)
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected bytes array at position {pos}"),
+ })?
+ .value(self.row_id))
}
}
@@ -368,8 +413,8 @@ impl InternalRow for ColumnarRow {
mod tests {
use super::*;
use arrow::array::{
- BinaryArray, BooleanArray, Float32Array, Float64Array, Int8Array,
Int16Array, Int32Array,
- Int64Array, StringArray,
+ BinaryArray, BooleanArray, Decimal128Array, Float32Array,
Float64Array, Int8Array,
+ Int16Array, Int32Array, Int64Array, StringArray,
};
use arrow::datatypes::{DataType, Field, Schema};
@@ -407,16 +452,16 @@ mod tests {
let mut row = ColumnarRow::new(Arc::new(batch));
assert_eq!(row.get_field_count(), 10);
- assert!(row.get_boolean(0));
- assert_eq!(row.get_byte(1), 1);
- assert_eq!(row.get_short(2), 2);
- assert_eq!(row.get_int(3), 3);
- assert_eq!(row.get_long(4), 4);
- assert_eq!(row.get_float(5), 1.25);
- assert_eq!(row.get_double(6), 2.5);
- assert_eq!(row.get_string(7), "hello");
- assert_eq!(row.get_bytes(8), b"data");
- assert_eq!(row.get_char(9, 2), "ab");
+ assert!(row.get_boolean(0).unwrap());
+ assert_eq!(row.get_byte(1).unwrap(), 1);
+ assert_eq!(row.get_short(2).unwrap(), 2);
+ assert_eq!(row.get_int(3).unwrap(), 3);
+ assert_eq!(row.get_long(4).unwrap(), 4);
+ assert_eq!(row.get_float(5).unwrap(), 1.25);
+ assert_eq!(row.get_double(6).unwrap(), 2.5);
+ assert_eq!(row.get_string(7).unwrap(), "hello");
+ assert_eq!(row.get_bytes(8).unwrap(), b"data");
+ assert_eq!(row.get_char(9, 2).unwrap(), "ab");
row.set_row_id(0);
assert_eq!(row.get_row_id(), 0);
}
@@ -465,12 +510,12 @@ mod tests {
// Verify decimal values
assert_eq!(
- row.get_decimal(0, 10, 2),
+ row.get_decimal(0, 10, 2).unwrap(),
crate::row::Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2),
10, 2)
.unwrap()
);
assert_eq!(
- row.get_decimal(1, 20, 5),
+ row.get_decimal(1, 20, 5).unwrap(),
crate::row::Decimal::from_big_decimal(
BigDecimal::new(BigInt::from(1234567890), 5),
20,
@@ -479,7 +524,7 @@ mod tests {
.unwrap()
);
assert_eq!(
- row.get_decimal(2, 38, 10),
+ row.get_decimal(2, 38, 10).unwrap(),
crate::row::Decimal::from_big_decimal(
BigDecimal::new(BigInt::from(999999999999999999i128), 10),
38,
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
index 2322207..918ebdf 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -16,9 +16,11 @@
// under the License.
use crate::client::WriteFormat;
+use crate::error::Result;
use crate::metadata::RowType;
use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer,
CompactedRowReader};
-use crate::row::{GenericRow, InternalRow};
+use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
+use crate::row::{Decimal, GenericRow, InternalRow};
use std::sync::{Arc, OnceLock};
// Reference implementation:
@@ -81,74 +83,80 @@ impl<'a> InternalRow for CompactedRow<'a> {
self.arity
}
- fn is_null_at(&self, pos: usize) -> bool {
- self.deserializer.get_row_type().fields().as_slice()[pos]
- .data_type
- .is_nullable()
- && self.reader.is_null_at(pos)
+ fn is_null_at(&self, pos: usize) -> Result<bool> {
+ let fields = self.deserializer.get_row_type().fields();
+ if pos >= fields.len() {
+ return Err(crate::error::Error::IllegalArgument {
+ message: format!(
+ "position {pos} out of bounds (row has {} fields)",
+ fields.len()
+ ),
+ });
+ }
+ Ok(fields.as_slice()[pos].data_type.is_nullable() &&
self.reader.is_null_at(pos))
}
- fn get_boolean(&self, pos: usize) -> bool {
+ fn get_boolean(&self, pos: usize) -> Result<bool> {
self.decoded_row().get_boolean(pos)
}
- fn get_byte(&self, pos: usize) -> i8 {
+ fn get_byte(&self, pos: usize) -> Result<i8> {
self.decoded_row().get_byte(pos)
}
- fn get_short(&self, pos: usize) -> i16 {
+ fn get_short(&self, pos: usize) -> Result<i16> {
self.decoded_row().get_short(pos)
}
- fn get_int(&self, pos: usize) -> i32 {
+ fn get_int(&self, pos: usize) -> Result<i32> {
self.decoded_row().get_int(pos)
}
- fn get_long(&self, pos: usize) -> i64 {
+ fn get_long(&self, pos: usize) -> Result<i64> {
self.decoded_row().get_long(pos)
}
- fn get_float(&self, pos: usize) -> f32 {
+ fn get_float(&self, pos: usize) -> Result<f32> {
self.decoded_row().get_float(pos)
}
- fn get_double(&self, pos: usize) -> f64 {
+ fn get_double(&self, pos: usize) -> Result<f64> {
self.decoded_row().get_double(pos)
}
- fn get_char(&self, pos: usize, length: usize) -> &str {
+ fn get_char(&self, pos: usize, length: usize) -> Result<&str> {
self.decoded_row().get_char(pos, length)
}
- fn get_string(&self, pos: usize) -> &str {
+ fn get_string(&self, pos: usize) -> Result<&str> {
self.decoded_row().get_string(pos)
}
- fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
crate::row::Decimal {
+ fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
Result<Decimal> {
self.decoded_row().get_decimal(pos, precision, scale)
}
- fn get_date(&self, pos: usize) -> crate::row::datum::Date {
+ fn get_date(&self, pos: usize) -> Result<Date> {
self.decoded_row().get_date(pos)
}
- fn get_time(&self, pos: usize) -> crate::row::datum::Time {
+ fn get_time(&self, pos: usize) -> Result<Time> {
self.decoded_row().get_time(pos)
}
- fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
crate::row::datum::TimestampNtz {
+ fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
Result<TimestampNtz> {
self.decoded_row().get_timestamp_ntz(pos, precision)
}
- fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
crate::row::datum::TimestampLtz {
+ fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
Result<TimestampLtz> {
self.decoded_row().get_timestamp_ltz(pos, precision)
}
- fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
+ fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]> {
self.decoded_row().get_binary(pos, length)
}
- fn get_bytes(&self, pos: usize) -> &[u8] {
+ fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
self.decoded_row().get_bytes(pos)
}
@@ -203,15 +211,15 @@ mod tests {
let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
assert_eq!(row.get_field_count(), 9);
- assert!(row.get_boolean(0));
- assert_eq!(row.get_byte(1), 1);
- assert_eq!(row.get_short(2), 100);
- assert_eq!(row.get_int(3), 1000);
- assert_eq!(row.get_long(4), 10000);
- assert_eq!(row.get_float(5), 1.5);
- assert_eq!(row.get_double(6), 2.5);
- assert_eq!(row.get_string(7), "Hello World");
- assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]);
+ assert!(row.get_boolean(0).unwrap());
+ assert_eq!(row.get_byte(1).unwrap(), 1);
+ assert_eq!(row.get_short(2).unwrap(), 100);
+ assert_eq!(row.get_int(3).unwrap(), 1000);
+ assert_eq!(row.get_long(4).unwrap(), 10000);
+ assert_eq!(row.get_float(5).unwrap(), 1.5);
+ assert_eq!(row.get_double(6).unwrap(), 2.5);
+ assert_eq!(row.get_string(7).unwrap(), "Hello World");
+ assert_eq!(row.get_bytes(8).unwrap(), &[1, 2, 3, 4, 5]);
// Test with nulls and negative values
let row_type = RowType::with_data_types(vec![
@@ -228,13 +236,13 @@ mod tests {
let bytes = writer.to_bytes();
let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
- assert!(!row.is_null_at(0));
- assert!(row.is_null_at(1));
- assert!(!row.is_null_at(2));
- assert_eq!(row.get_int(0), -42);
- assert_eq!(row.get_double(2), 2.71);
+ assert!(!row.is_null_at(0).unwrap());
+ assert!(row.is_null_at(1).unwrap());
+ assert!(!row.is_null_at(2).unwrap());
+ assert_eq!(row.get_int(0).unwrap(), -42);
+ assert_eq!(row.get_double(2).unwrap(), 2.71);
// Verify caching works on repeated reads
- assert_eq!(row.get_int(0), -42);
+ assert_eq!(row.get_int(0).unwrap(), -42);
}
#[test]
@@ -285,30 +293,33 @@ mod tests {
let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
// Verify all values
- assert_eq!(row.get_date(0).get_inner(), 19651);
- assert_eq!(row.get_time(1).get_inner(), 34200000);
- assert_eq!(row.get_timestamp_ntz(2, 3).get_millisecond(),
1698235273182);
+ assert_eq!(row.get_date(0).unwrap().get_inner(), 19651);
+ assert_eq!(row.get_time(1).unwrap().get_inner(), 34200000);
+ assert_eq!(
+ row.get_timestamp_ntz(2, 3).unwrap().get_millisecond(),
+ 1698235273182
+ );
assert_eq!(
- row.get_timestamp_ltz(3, 3).get_epoch_millisecond(),
+ row.get_timestamp_ltz(3, 3).unwrap().get_epoch_millisecond(),
1698235273182
);
- let read_ts_ntz = row.get_timestamp_ntz(4, 6);
+ let read_ts_ntz = row.get_timestamp_ntz(4, 6).unwrap();
assert_eq!(read_ts_ntz.get_millisecond(), 1698235273182);
assert_eq!(read_ts_ntz.get_nano_of_millisecond(), 123456);
- let read_ts_ltz = row.get_timestamp_ltz(5, 9);
+ let read_ts_ltz = row.get_timestamp_ltz(5, 9).unwrap();
assert_eq!(read_ts_ltz.get_epoch_millisecond(), 1698235273182);
assert_eq!(read_ts_ltz.get_nano_of_millisecond(), 987654);
// Assert on Decimal equality
- assert_eq!(row.get_decimal(6, 10, 2), small_decimal);
- assert_eq!(row.get_decimal(7, 28, 10), large_decimal);
+ assert_eq!(row.get_decimal(6, 10, 2).unwrap(), small_decimal);
+ assert_eq!(row.get_decimal(7, 28, 10).unwrap(), large_decimal);
// Assert on Decimal components to catch any regressions
- let read_small_decimal = row.get_decimal(6, 10, 2);
+ let read_small_decimal = row.get_decimal(6, 10, 2).unwrap();
assert_eq!(read_small_decimal.precision(), 10);
assert_eq!(read_small_decimal.scale(), 2);
assert_eq!(read_small_decimal.to_unscaled_long().unwrap(), 12345);
- let read_large_decimal = row.get_decimal(7, 28, 10);
+ let read_large_decimal = row.get_decimal(7, 28, 10).unwrap();
assert_eq!(read_large_decimal.precision(), 28);
assert_eq!(read_large_decimal.scale(), 10);
assert_eq!(
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
index 563c1c9..d201450 100644
--- a/crates/fluss/src/row/encode/compacted_key_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -83,7 +83,7 @@ impl KeyEncoder for CompactedKeyEncoder {
// iterate all the fields of the row, and encode each field
for (pos, field_getter) in self.field_getters.iter().enumerate() {
- match &field_getter.get_field(row) {
+ match &field_getter.get_field(row)? {
Datum::Null => {
return Err(IllegalArgument {
message: format!("Cannot encode key with null value at
position: {pos:?}"),
diff --git a/crates/fluss/src/row/field_getter.rs
b/crates/fluss/src/row/field_getter.rs
index cbffa4d..d6b9fc9 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::error::Result;
use crate::metadata::{DataType, RowType};
use crate::row::{Datum, InternalRow};
@@ -24,11 +25,11 @@ pub enum FieldGetter {
NonNullable(InnerFieldGetter),
}
impl FieldGetter {
- pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
+ pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Result<Datum<'a>>
{
match self {
FieldGetter::Nullable(getter) => {
- if row.is_null_at(getter.pos()) {
- Datum::Null
+ if row.is_null_at(getter.pos())? {
+ Ok(Datum::Null)
} else {
getter.get_field(row)
}
@@ -151,33 +152,33 @@ pub enum InnerFieldGetter {
}
impl InnerFieldGetter {
- pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
- match self {
- InnerFieldGetter::Char { pos, len } =>
Datum::from(row.get_char(*pos, *len)),
- InnerFieldGetter::String { pos } =>
Datum::from(row.get_string(*pos)),
- InnerFieldGetter::Bool { pos } =>
Datum::from(row.get_boolean(*pos)),
- InnerFieldGetter::Binary { pos, len } =>
Datum::from(row.get_binary(*pos, *len)),
- InnerFieldGetter::Bytes { pos } =>
Datum::from(row.get_bytes(*pos)),
- InnerFieldGetter::TinyInt { pos } =>
Datum::from(row.get_byte(*pos)),
- InnerFieldGetter::SmallInt { pos } =>
Datum::from(row.get_short(*pos)),
- InnerFieldGetter::Int { pos } => Datum::from(row.get_int(*pos)),
- InnerFieldGetter::BigInt { pos } =>
Datum::from(row.get_long(*pos)),
- InnerFieldGetter::Float { pos } =>
Datum::from(row.get_float(*pos)),
- InnerFieldGetter::Double { pos } =>
Datum::from(row.get_double(*pos)),
+ pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Result<Datum<'a>>
{
+ Ok(match self {
+ InnerFieldGetter::Char { pos, len } =>
Datum::from(row.get_char(*pos, *len)?),
+ InnerFieldGetter::String { pos } =>
Datum::from(row.get_string(*pos)?),
+ InnerFieldGetter::Bool { pos } =>
Datum::from(row.get_boolean(*pos)?),
+ InnerFieldGetter::Binary { pos, len } =>
Datum::from(row.get_binary(*pos, *len)?),
+ InnerFieldGetter::Bytes { pos } =>
Datum::from(row.get_bytes(*pos)?),
+ InnerFieldGetter::TinyInt { pos } =>
Datum::from(row.get_byte(*pos)?),
+ InnerFieldGetter::SmallInt { pos } =>
Datum::from(row.get_short(*pos)?),
+ InnerFieldGetter::Int { pos } => Datum::from(row.get_int(*pos)?),
+ InnerFieldGetter::BigInt { pos } =>
Datum::from(row.get_long(*pos)?),
+ InnerFieldGetter::Float { pos } =>
Datum::from(row.get_float(*pos)?),
+ InnerFieldGetter::Double { pos } =>
Datum::from(row.get_double(*pos)?),
InnerFieldGetter::Decimal {
pos,
precision,
scale,
- } => Datum::Decimal(row.get_decimal(*pos, *precision, *scale)),
- InnerFieldGetter::Date { pos } => Datum::Date(row.get_date(*pos)),
- InnerFieldGetter::Time { pos } => Datum::Time(row.get_time(*pos)),
+ } => Datum::Decimal(row.get_decimal(*pos, *precision, *scale)?),
+ InnerFieldGetter::Date { pos } => Datum::Date(row.get_date(*pos)?),
+ InnerFieldGetter::Time { pos } => Datum::Time(row.get_time(*pos)?),
InnerFieldGetter::Timestamp { pos, precision } => {
- Datum::TimestampNtz(row.get_timestamp_ntz(*pos, *precision))
+ Datum::TimestampNtz(row.get_timestamp_ntz(*pos, *precision)?)
}
InnerFieldGetter::TimestampLtz { pos, precision } => {
- Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision))
+ Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?)
} //TODO Array, Map, Row
- }
+ })
}
pub fn pos(&self) -> usize {
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 276dcca..8fb777d 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -54,67 +54,69 @@ impl<'a> BinaryRow<'a> {
}
}
-// TODO make functions return Result<?> for better error handling
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+
pub trait InternalRow: Send + Sync {
/// Returns the number of fields in this row
fn get_field_count(&self) -> usize;
/// Returns true if the element is null at the given position
- fn is_null_at(&self, pos: usize) -> bool;
+ fn is_null_at(&self, pos: usize) -> Result<bool>;
/// Returns the boolean value at the given position
- fn get_boolean(&self, pos: usize) -> bool;
+ fn get_boolean(&self, pos: usize) -> Result<bool>;
/// Returns the byte value at the given position
- fn get_byte(&self, pos: usize) -> i8;
+ fn get_byte(&self, pos: usize) -> Result<i8>;
/// Returns the short value at the given position
- fn get_short(&self, pos: usize) -> i16;
+ fn get_short(&self, pos: usize) -> Result<i16>;
/// Returns the integer value at the given position
- fn get_int(&self, pos: usize) -> i32;
+ fn get_int(&self, pos: usize) -> Result<i32>;
/// Returns the long value at the given position
- fn get_long(&self, pos: usize) -> i64;
+ fn get_long(&self, pos: usize) -> Result<i64>;
/// Returns the float value at the given position
- fn get_float(&self, pos: usize) -> f32;
+ fn get_float(&self, pos: usize) -> Result<f32>;
/// Returns the double value at the given position
- fn get_double(&self, pos: usize) -> f64;
+ fn get_double(&self, pos: usize) -> Result<f64>;
/// Returns the string value at the given position with fixed length
- fn get_char(&self, pos: usize, length: usize) -> &str;
+ fn get_char(&self, pos: usize, length: usize) -> Result<&str>;
/// Returns the string value at the given position
- fn get_string(&self, pos: usize) -> &str;
+ fn get_string(&self, pos: usize) -> Result<&str>;
/// Returns the decimal value at the given position
- fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
Decimal;
+ fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
Result<Decimal>;
/// Returns the date value at the given position (date as days since epoch)
- fn get_date(&self, pos: usize) -> datum::Date;
+ fn get_date(&self, pos: usize) -> Result<Date>;
/// Returns the time value at the given position (time as milliseconds
since midnight)
- fn get_time(&self, pos: usize) -> datum::Time;
+ fn get_time(&self, pos: usize) -> Result<Time>;
/// Returns the timestamp value at the given position (timestamp without
timezone)
///
/// The precision is required to determine whether the timestamp value was
stored
/// in a compact representation (precision <= 3) or with nanosecond
precision.
- fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
datum::TimestampNtz;
+ fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
Result<TimestampNtz>;
/// Returns the timestamp value at the given position (timestamp with
local timezone)
///
/// The precision is required to determine whether the timestamp value was
stored
/// in a compact representation (precision <= 3) or with nanosecond
precision.
- fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
datum::TimestampLtz;
+ fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
Result<TimestampLtz>;
/// Returns the binary value at the given position with fixed length
- fn get_binary(&self, pos: usize, length: usize) -> &[u8];
+ fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]>;
/// Returns the binary value at the given position
- fn get_bytes(&self, pos: usize) -> &[u8];
+ fn get_bytes(&self, pos: usize) -> Result<&[u8]>;
/// Returns encoded bytes if already encoded
fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> {
@@ -127,98 +129,149 @@ pub struct GenericRow<'a> {
pub values: Vec<Datum<'a>>,
}
+impl<'a> GenericRow<'a> {
+ fn get_value(&self, pos: usize) -> Result<&Datum<'a>> {
+ self.values.get(pos).ok_or_else(|| IllegalArgument {
+ message: format!(
+ "position {pos} out of bounds (row has {} fields)",
+ self.values.len()
+ ),
+ })
+ }
+
+ fn try_convert<T: TryFrom<&'a Datum<'a>>>(
+ &'a self,
+ pos: usize,
+ expected_type: &str,
+ ) -> Result<T> {
+ let datum = self.get_value(pos)?;
+ T::try_from(datum).map_err(|_| IllegalArgument {
+ message: format!(
+ "type mismatch at position {pos}: expected {expected_type},
got {datum:?}"
+ ),
+ })
+ }
+}
+
impl<'a> InternalRow for GenericRow<'a> {
fn get_field_count(&self) -> usize {
self.values.len()
}
- fn is_null_at(&self, pos: usize) -> bool {
- self.values
- .get(pos)
- .expect("position out of bounds")
- .is_null()
+ fn is_null_at(&self, pos: usize) -> Result<bool> {
+ Ok(self.get_value(pos)?.is_null())
}
- fn get_boolean(&self, pos: usize) -> bool {
- self.values.get(pos).unwrap().try_into().unwrap()
+ fn get_boolean(&self, pos: usize) -> Result<bool> {
+ self.try_convert(pos, "Boolean")
}
- fn get_byte(&self, pos: usize) -> i8 {
- self.values.get(pos).unwrap().try_into().unwrap()
+ fn get_byte(&self, pos: usize) -> Result<i8> {
+ self.try_convert(pos, "TinyInt")
}
- fn get_short(&self, pos: usize) -> i16 {
- self.values.get(pos).unwrap().try_into().unwrap()
+ fn get_short(&self, pos: usize) -> Result<i16> {
+ self.try_convert(pos, "SmallInt")
}
- fn get_int(&self, pos: usize) -> i32 {
- self.values.get(pos).unwrap().try_into().unwrap()
+ fn get_int(&self, pos: usize) -> Result<i32> {
+ self.try_convert(pos, "Int")
}
- fn get_long(&self, _pos: usize) -> i64 {
- self.values.get(_pos).unwrap().try_into().unwrap()
+ fn get_long(&self, pos: usize) -> Result<i64> {
+ self.try_convert(pos, "BigInt")
}
- fn get_float(&self, pos: usize) -> f32 {
- self.values.get(pos).unwrap().try_into().unwrap()
+ fn get_float(&self, pos: usize) -> Result<f32> {
+ self.try_convert(pos, "Float")
}
- fn get_double(&self, pos: usize) -> f64 {
- self.values.get(pos).unwrap().try_into().unwrap()
+ fn get_double(&self, pos: usize) -> Result<f64> {
+ self.try_convert(pos, "Double")
}
- fn get_char(&self, pos: usize, _length: usize) -> &str {
+ fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
// don't check length, following java client
self.get_string(pos)
}
- fn get_string(&self, pos: usize) -> &str {
- self.values.get(pos).unwrap().try_into().unwrap()
+ fn get_string(&self, pos: usize) -> Result<&str> {
+ self.try_convert(pos, "String")
}
- fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) ->
Decimal {
- match self.values.get(pos).unwrap() {
- Datum::Decimal(d) => d.clone(),
- other => panic!("Expected Decimal at pos {pos:?}, got {other:?}"),
+ fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) ->
Result<Decimal> {
+ match self.get_value(pos)? {
+ Datum::Decimal(d) => Ok(d.clone()),
+ other => Err(IllegalArgument {
+ message: format!(
+ "type mismatch at position {pos}: expected Decimal, got
{other:?}"
+ ),
+ }),
}
}
- fn get_date(&self, pos: usize) -> datum::Date {
- match self.values.get(pos).unwrap() {
- Datum::Date(d) => *d,
- Datum::Int32(i) => datum::Date::new(*i),
- other => panic!("Expected Date or Int32 at pos {pos:?}, got
{other:?}"),
+ fn get_date(&self, pos: usize) -> Result<Date> {
+ match self.get_value(pos)? {
+ Datum::Date(d) => Ok(*d),
+ Datum::Int32(i) => Ok(Date::new(*i)),
+ other => Err(IllegalArgument {
+ message: format!(
+ "type mismatch at position {pos}: expected Date or Int32,
got {other:?}"
+ ),
+ }),
}
}
- fn get_time(&self, pos: usize) -> datum::Time {
- match self.values.get(pos).unwrap() {
- Datum::Time(t) => *t,
- Datum::Int32(i) => datum::Time::new(*i),
- other => panic!("Expected Time or Int32 at pos {pos:?}, got
{other:?}"),
+ fn get_time(&self, pos: usize) -> Result<Time> {
+ match self.get_value(pos)? {
+ Datum::Time(t) => Ok(*t),
+ Datum::Int32(i) => Ok(Time::new(*i)),
+ other => Err(IllegalArgument {
+ message: format!(
+ "type mismatch at position {pos}: expected Time or Int32,
got {other:?}"
+ ),
+ }),
}
}
- fn get_timestamp_ntz(&self, pos: usize, _precision: u32) ->
datum::TimestampNtz {
- match self.values.get(pos).unwrap() {
- Datum::TimestampNtz(t) => *t,
- other => panic!("Expected TimestampNtz at pos {pos:?}, got
{other:?}"),
+ fn get_timestamp_ntz(&self, pos: usize, _precision: u32) ->
Result<TimestampNtz> {
+ match self.get_value(pos)? {
+ Datum::TimestampNtz(t) => Ok(*t),
+ other => Err(IllegalArgument {
+ message: format!(
+ "type mismatch at position {pos}: expected TimestampNtz,
got {other:?}"
+ ),
+ }),
}
}
- fn get_timestamp_ltz(&self, pos: usize, _precision: u32) ->
datum::TimestampLtz {
- match self.values.get(pos).unwrap() {
- Datum::TimestampLtz(t) => *t,
- other => panic!("Expected TimestampLtz at pos {pos:?}, got
{other:?}"),
+ fn get_timestamp_ltz(&self, pos: usize, _precision: u32) ->
Result<TimestampLtz> {
+ match self.get_value(pos)? {
+ Datum::TimestampLtz(t) => Ok(*t),
+ other => Err(IllegalArgument {
+ message: format!(
+ "type mismatch at position {pos}: expected TimestampLtz,
got {other:?}"
+ ),
+ }),
}
}
- fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
- self.values.get(pos).unwrap().as_blob()
+ fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
+ match self.get_value(pos)? {
+ Datum::Blob(b) => Ok(b.as_ref()),
+ other => Err(IllegalArgument {
+ message: format!("type mismatch at position {pos}: expected
Binary, got {other:?}"),
+ }),
+ }
}
- fn get_bytes(&self, pos: usize) -> &[u8] {
- self.values.get(pos).unwrap().as_blob()
+ fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
+ match self.get_value(pos)? {
+ Datum::Blob(b) => Ok(b.as_ref()),
+ other => Err(IllegalArgument {
+ message: format!("type mismatch at position {pos}: expected
Bytes, got {other:?}"),
+ }),
+ }
}
}
@@ -268,17 +321,27 @@ mod tests {
row.set_field(0, Datum::Null);
row.set_field(1, 42_i32);
- assert!(row.is_null_at(0));
- assert!(!row.is_null_at(1));
+ assert!(row.is_null_at(0).unwrap());
+ assert!(!row.is_null_at(1).unwrap());
+ }
+
+ #[test]
+ fn is_null_at_out_of_bounds_returns_error() {
+ let row = GenericRow::from_data(vec![42_i32]);
+ let err = row.is_null_at(5).unwrap_err();
+ assert!(
+ err.to_string().contains("out of bounds"),
+ "Expected out of bounds error, got: {err}"
+ );
}
#[test]
fn new_initializes_nulls() {
let row = GenericRow::new(3);
assert_eq!(row.get_field_count(), 3);
- assert!(row.is_null_at(0));
- assert!(row.is_null_at(1));
- assert!(row.is_null_at(2));
+ assert!(row.is_null_at(0).unwrap());
+ assert!(row.is_null_at(1).unwrap());
+ assert!(row.is_null_at(2).unwrap());
}
#[test]
@@ -288,8 +351,28 @@ mod tests {
row.set_field(0, 123_i32);
// Fields 1 and 2 remain null
assert_eq!(row.get_field_count(), 3);
- assert_eq!(row.get_int(0), 123);
- assert!(row.is_null_at(1));
- assert!(row.is_null_at(2));
+ assert_eq!(row.get_int(0).unwrap(), 123);
+ assert!(row.is_null_at(1).unwrap());
+ assert!(row.is_null_at(2).unwrap());
+ }
+
+ #[test]
+ fn type_mismatch_returns_error() {
+ let row = GenericRow::from_data(vec![Datum::Int64(999)]);
+ let err = row.get_string(0).unwrap_err();
+ assert!(
+ err.to_string().contains("type mismatch"),
+ "Expected type mismatch error, got: {err}"
+ );
+ }
+
+ #[test]
+ fn out_of_bounds_returns_error() {
+ let row = GenericRow::from_data(vec![42_i32]);
+ let err = row.get_int(5).unwrap_err();
+ assert!(
+ err.to_string().contains("out of bounds"),
+ "Expected out of bounds error, got: {err}"
+ );
}
}
diff --git a/crates/fluss/src/row/row_decoder.rs
b/crates/fluss/src/row/row_decoder.rs
index 9f9b421..aea8c86 100644
--- a/crates/fluss/src/row/row_decoder.rs
+++ b/crates/fluss/src/row/row_decoder.rs
@@ -112,8 +112,8 @@ mod tests {
// Verify
assert_eq!(row.get_field_count(), 2);
- assert_eq!(row.get_int(0), 42);
- assert_eq!(row.get_string(1), "hello");
+ assert_eq!(row.get_int(0).unwrap(), 42);
+ assert_eq!(row.get_string(1).unwrap(), "hello");
}
#[test]
@@ -131,7 +131,7 @@ mod tests {
let row = decoder.decode(&data);
// Verify
- assert_eq!(row.get_int(0), 100);
- assert_eq!(row.get_string(1), "world");
+ assert_eq!(row.get_int(0).unwrap(), 100);
+ assert_eq!(row.get_string(1).unwrap(), "world");
}
}
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index ab5f5b6..c101a18 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -64,7 +64,7 @@ mod kv_table_test {
let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().await.unwrap();
let table_path = TablePath::new("fluss", "test_upsert_and_lookup");
@@ -83,10 +83,7 @@ mod kv_table_test {
create_table(&admin, &table_path, &table_descriptor).await;
- let table = connection
- .get_table(&table_path)
- .await
- .expect("Failed to get table");
+ let table = connection.get_table(&table_path).await.unwrap();
let table_upsert = table.new_upsert().expect("Failed to create
upsert");
let upsert_writer = table_upsert
@@ -118,14 +115,11 @@ mod kv_table_test {
.lookup(&make_key(*id))
.await
.expect("Failed to lookup");
- let row = result
- .get_single_row()
- .expect("Failed to get row")
- .expect("Row should exist");
+ let row = result.get_single_row().unwrap().expect("Row should
exist");
- assert_eq!(row.get_int(0), *id, "id mismatch");
- assert_eq!(row.get_string(1), *expected_name, "name mismatch");
- assert_eq!(row.get_long(2), *expected_age, "age mismatch");
+ assert_eq!(row.get_int(0).unwrap(), *id, "id mismatch");
+ assert_eq!(row.get_string(1).unwrap(), *expected_name, "name
mismatch");
+ assert_eq!(row.get_long(2).unwrap(), *expected_age, "age
mismatch");
}
// Update the record with new age (await acknowledgment)
@@ -144,18 +138,15 @@ mod kv_table_test {
.lookup(&make_key(1))
.await
.expect("Failed to lookup after update");
- let found_row = result
- .get_single_row()
- .expect("Failed to get row")
- .expect("Row should exist");
+ let found_row = result.get_single_row().unwrap().expect("Row should
exist");
assert_eq!(
- found_row.get_long(2),
- updated_row.get_long(2),
+ found_row.get_long(2).unwrap(),
+ updated_row.get_long(2).unwrap(),
"Age should be updated"
);
assert_eq!(
- found_row.get_string(1),
- updated_row.get_string(1),
+ found_row.get_string(1).unwrap(),
+ updated_row.get_string(1).unwrap(),
"Name should remain unchanged"
);
@@ -174,10 +165,7 @@ mod kv_table_test {
.await
.expect("Failed to lookup deleted record");
assert!(
- result
- .get_single_row()
- .expect("Failed to get row")
- .is_none(),
+ result.get_single_row().unwrap().is_none(),
"Record 1 should not exist after delete"
);
@@ -188,10 +176,7 @@ mod kv_table_test {
.await
.expect("Failed to lookup");
assert!(
- result
- .get_single_row()
- .expect("Failed to get row")
- .is_some(),
+ result.get_single_row().unwrap().is_some(),
"Record {} should still exist after deleting record 1",
i
);
@@ -203,10 +188,7 @@ mod kv_table_test {
.await
.expect("Failed to lookup non-existent key");
assert!(
- result
- .get_single_row()
- .expect("Failed to get row")
- .is_none(),
+ result.get_single_row().unwrap().is_none(),
"Non-existent key should return None"
);
@@ -221,7 +203,7 @@ mod kv_table_test {
let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().await.unwrap();
let table_path = TablePath::new("fluss", "test_composite_pk");
@@ -240,10 +222,7 @@ mod kv_table_test {
create_table(&admin, &table_path, &table_descriptor).await;
- let table = connection
- .get_table(&table_path)
- .await
- .expect("Failed to get table");
+ let table = connection.get_table(&table_path).await.unwrap();
let table_upsert = table.new_upsert().expect("Failed to create
upsert");
let upsert_writer = table_upsert
@@ -279,22 +258,24 @@ mod kv_table_test {
key.set_field(0, "US");
key.set_field(1, 1);
let result = lookuper.lookup(&key).await.expect("Failed to lookup");
- let row = result
- .get_single_row()
- .expect("Failed to get row")
- .expect("Row should exist");
- assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100");
+ let row = result.get_single_row().unwrap().expect("Row should exist");
+ assert_eq!(
+ row.get_long(2).unwrap(),
+ 100,
+ "Score for (US, 1) should be 100"
+ );
// Lookup (EU, 2) - should return score 250
let mut key = GenericRow::new(3);
key.set_field(0, "EU");
key.set_field(1, 2);
let result = lookuper.lookup(&key).await.expect("Failed to lookup");
- let row = result
- .get_single_row()
- .expect("Failed to get row")
- .expect("Row should exist");
- assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
+ let row = result.get_single_row().unwrap().expect("Row should exist");
+ assert_eq!(
+ row.get_long(2).unwrap(),
+ 250,
+ "Score for (EU, 2) should be 250"
+ );
// Update (US, 1) score (await acknowledgment)
let mut update_row = GenericRow::new(3);
@@ -312,13 +293,10 @@ mod kv_table_test {
key.set_field(0, "US");
key.set_field(1, 1);
let result = lookuper.lookup(&key).await.expect("Failed to lookup");
- let row = result
- .get_single_row()
- .expect("Failed to get row")
- .expect("Row should exist");
+ let row = result.get_single_row().unwrap().expect("Row should exist");
assert_eq!(
- row.get_long(2),
- update_row.get_long(2),
+ row.get_long(2).unwrap(),
+ update_row.get_long(2).unwrap(),
"Row score should be updated"
);
@@ -393,10 +371,10 @@ mod kv_table_test {
.expect("Failed to get row")
.expect("Row should exist");
- assert_eq!(found_row.get_int(0), 1);
- assert_eq!(found_row.get_string(1), "Verso");
- assert_eq!(found_row.get_long(2), 32i64);
- assert_eq!(found_row.get_long(3), 6942i64);
+ assert_eq!(found_row.get_int(0).unwrap(), 1);
+ assert_eq!(found_row.get_string(1).unwrap(), "Verso");
+ assert_eq!(found_row.get_long(2).unwrap(), 32i64);
+ assert_eq!(found_row.get_long(3).unwrap(), 6942i64);
// Create partial update writer to update only score column
let partial_upsert = table_upsert
@@ -428,14 +406,22 @@ mod kv_table_test {
.expect("Failed to get row")
.expect("Row should exist");
- assert_eq!(found_row.get_int(0), 1, "id should remain 1");
+ assert_eq!(found_row.get_int(0).unwrap(), 1, "id should remain 1");
assert_eq!(
- found_row.get_string(1),
+ found_row.get_string(1).unwrap(),
"Verso",
"name should remain unchanged"
);
- assert_eq!(found_row.get_long(2), 32, "age should remain unchanged");
- assert_eq!(found_row.get_long(3), 420, "score should be updated to
420");
+ assert_eq!(
+ found_row.get_long(2).unwrap(),
+ 32,
+ "age should remain unchanged"
+ );
+ assert_eq!(
+ found_row.get_long(3).unwrap(),
+ 420,
+ "score should be updated to 420"
+ );
admin
.drop_table(&table_path, false)
@@ -524,10 +510,10 @@ mod kv_table_test {
.expect("Failed to get row")
.expect("Row should exist");
- assert_eq!(row.get_string(0), *region, "region mismatch");
- assert_eq!(row.get_int(1), *user_id, "user_id mismatch");
- assert_eq!(row.get_string(2), *expected_name, "name mismatch");
- assert_eq!(row.get_long(3), *expected_score, "score mismatch");
+ assert_eq!(row.get_string(0).unwrap(), *region, "region mismatch");
+ assert_eq!(row.get_int(1).unwrap(), *user_id, "user_id mismatch");
+ assert_eq!(row.get_string(2).unwrap(), *expected_name, "name
mismatch");
+ assert_eq!(row.get_long(3).unwrap(), *expected_score, "score
mismatch");
}
// Test update within a partition (await acknowledgment)
@@ -551,8 +537,8 @@ mod kv_table_test {
.get_single_row()
.expect("Failed to get row")
.expect("Row should exist");
- assert_eq!(row.get_string(2), "Gustave Updated");
- assert_eq!(row.get_long(3), 999);
+ assert_eq!(row.get_string(2).unwrap(), "Gustave Updated");
+ assert_eq!(row.get_long(3).unwrap(), 999);
// Lookup in non-existent partition should return empty result
let mut non_existent_key = GenericRow::new(4);
@@ -602,7 +588,7 @@ mod kv_table_test {
.get_single_row()
.expect("Failed to get row")
.expect("Row should exist");
- assert_eq!(row.get_string(2), "Maelle");
+ assert_eq!(row.get_string(2).unwrap(), "Maelle");
admin
.drop_table(&table_path, false)
@@ -732,62 +718,88 @@ mod kv_table_test {
.expect("Row should exist");
// Verify all datatypes
- assert_eq!(found_row.get_int(0), pk_int, "pk_int mismatch");
+ assert_eq!(found_row.get_int(0).unwrap(), pk_int, "pk_int mismatch");
assert_eq!(
- found_row.get_boolean(1),
+ found_row.get_boolean(1).unwrap(),
col_boolean,
"col_boolean mismatch"
);
- assert_eq!(found_row.get_byte(2), col_tinyint, "col_tinyint mismatch");
assert_eq!(
- found_row.get_short(3),
+ found_row.get_byte(2).unwrap(),
+ col_tinyint,
+ "col_tinyint mismatch"
+ );
+ assert_eq!(
+ found_row.get_short(3).unwrap(),
col_smallint,
"col_smallint mismatch"
);
- assert_eq!(found_row.get_int(4), col_int, "col_int mismatch");
- assert_eq!(found_row.get_long(5), col_bigint, "col_bigint mismatch");
+ assert_eq!(found_row.get_int(4).unwrap(), col_int, "col_int mismatch");
+ assert_eq!(
+ found_row.get_long(5).unwrap(),
+ col_bigint,
+ "col_bigint mismatch"
+ );
assert!(
- (found_row.get_float(6) - col_float).abs() < f32::EPSILON,
+ (found_row.get_float(6).unwrap() - col_float).abs() < f32::EPSILON,
"col_float mismatch: expected {}, got {}",
col_float,
- found_row.get_float(6)
+ found_row.get_float(6).unwrap()
);
assert!(
- (found_row.get_double(7) - col_double).abs() < f64::EPSILON,
+ (found_row.get_double(7).unwrap() - col_double).abs() <
f64::EPSILON,
"col_double mismatch: expected {}, got {}",
col_double,
- found_row.get_double(7)
+ found_row.get_double(7).unwrap()
+ );
+ assert_eq!(
+ found_row.get_char(8, 10).unwrap(),
+ col_char,
+ "col_char mismatch"
+ );
+ assert_eq!(
+ found_row.get_string(9).unwrap(),
+ col_string,
+ "col_string mismatch"
);
- assert_eq!(found_row.get_char(8, 10), col_char, "col_char mismatch");
- assert_eq!(found_row.get_string(9), col_string, "col_string mismatch");
assert_eq!(
- found_row.get_decimal(10, 10, 2),
+ found_row.get_decimal(10, 10, 2).unwrap(),
col_decimal,
"col_decimal mismatch"
);
assert_eq!(
- found_row.get_date(11).get_inner(),
+ found_row.get_date(11).unwrap().get_inner(),
col_date.get_inner(),
"col_date mismatch"
);
assert_eq!(
- found_row.get_time(12).get_inner(),
+ found_row.get_time(12).unwrap().get_inner(),
col_time.get_inner(),
"col_time mismatch"
);
assert_eq!(
- found_row.get_timestamp_ntz(13, 6).get_millisecond(),
+ found_row
+ .get_timestamp_ntz(13, 6)
+ .unwrap()
+ .get_millisecond(),
col_timestamp.get_millisecond(),
"col_timestamp mismatch"
);
assert_eq!(
- found_row.get_timestamp_ltz(14, 6).get_epoch_millisecond(),
+ found_row
+ .get_timestamp_ltz(14, 6)
+ .unwrap()
+ .get_epoch_millisecond(),
col_timestamp_ltz.get_epoch_millisecond(),
"col_timestamp_ltz mismatch"
);
- assert_eq!(found_row.get_bytes(15), col_bytes, "col_bytes mismatch");
assert_eq!(
- found_row.get_binary(16, 20),
+ found_row.get_bytes(15).unwrap(),
+ col_bytes,
+ "col_bytes mismatch"
+ );
+ assert_eq!(
+ found_row.get_binary(16, 20).unwrap(),
col_binary,
"col_binary mismatch"
);
@@ -830,29 +842,75 @@ mod kv_table_test {
.expect("Row should exist");
// Verify all nullable columns are null
- assert_eq!(found_row_nulls.get_int(0), pk_int_2, "pk_int mismatch");
- assert!(found_row_nulls.is_null_at(1), "col_boolean should be null");
- assert!(found_row_nulls.is_null_at(2), "col_tinyint should be null");
- assert!(found_row_nulls.is_null_at(3), "col_smallint should be null");
- assert!(found_row_nulls.is_null_at(4), "col_int should be null");
- assert!(found_row_nulls.is_null_at(5), "col_bigint should be null");
- assert!(found_row_nulls.is_null_at(6), "col_float should be null");
- assert!(found_row_nulls.is_null_at(7), "col_double should be null");
- assert!(found_row_nulls.is_null_at(8), "col_char should be null");
- assert!(found_row_nulls.is_null_at(9), "col_string should be null");
- assert!(found_row_nulls.is_null_at(10), "col_decimal should be null");
- assert!(found_row_nulls.is_null_at(11), "col_date should be null");
- assert!(found_row_nulls.is_null_at(12), "col_time should be null");
+ assert_eq!(
+ found_row_nulls.get_int(0).unwrap(),
+ pk_int_2,
+ "pk_int mismatch"
+ );
+ assert!(
+ found_row_nulls.is_null_at(1).unwrap(),
+ "col_boolean should be null"
+ );
assert!(
- found_row_nulls.is_null_at(13),
+ found_row_nulls.is_null_at(2).unwrap(),
+ "col_tinyint should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(3).unwrap(),
+ "col_smallint should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(4).unwrap(),
+ "col_int should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(5).unwrap(),
+ "col_bigint should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(6).unwrap(),
+ "col_float should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(7).unwrap(),
+ "col_double should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(8).unwrap(),
+ "col_char should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(9).unwrap(),
+ "col_string should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(10).unwrap(),
+ "col_decimal should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(11).unwrap(),
+ "col_date should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(12).unwrap(),
+ "col_time should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(13).unwrap(),
"col_timestamp should be null"
);
assert!(
- found_row_nulls.is_null_at(14),
+ found_row_nulls.is_null_at(14).unwrap(),
"col_timestamp_ltz should be null"
);
- assert!(found_row_nulls.is_null_at(15), "col_bytes should be null");
- assert!(found_row_nulls.is_null_at(16), "col_binary should be null");
+ assert!(
+ found_row_nulls.is_null_at(15).unwrap(),
+ "col_bytes should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(16).unwrap(),
+ "col_binary should be null"
+ );
admin
.drop_table(&table_path, false)
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index eac72e5..779ffdd 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -138,7 +138,10 @@ mod table_test {
.expect("Failed to poll records");
for rec in scan_records {
let row = rec.row();
- collected.push((row.get_int(0),
row.get_string(1).to_string()));
+ collected.push((
+ row.get_int(0).unwrap(),
+ row.get_string(1).unwrap().to_string(),
+ ));
}
}
@@ -362,13 +365,13 @@ mod table_test {
let row = record.row();
// col_b is now at index 0, col_c is at index 1
assert_eq!(
- row.get_string(0),
+ row.get_string(0).unwrap(),
expected_col_b[i],
"col_b mismatch at index {}",
i
);
assert_eq!(
- row.get_int(1),
+ row.get_int(1).unwrap(),
expected_col_c[i],
"col_c mismatch at index {}",
i
@@ -394,13 +397,13 @@ mod table_test {
let row = record.row();
// col_b is now at index 0, col_c is at index 1
assert_eq!(
- row.get_string(0),
+ row.get_string(0).unwrap(),
expected_col_b[i],
"col_b mismatch at index {}",
i
);
assert_eq!(
- row.get_int(1),
+ row.get_int(1).unwrap(),
expected_col_a[i],
"col_c mismatch at index {}",
i
@@ -777,81 +780,103 @@ mod table_test {
assert_eq!(records.len(), 2, "Expected 2 records");
let found_row = records[0].row();
- assert_eq!(found_row.get_byte(0), col_tinyint, "col_tinyint mismatch");
assert_eq!(
- found_row.get_short(1),
+ found_row.get_byte(0).unwrap(),
+ col_tinyint,
+ "col_tinyint mismatch"
+ );
+ assert_eq!(
+ found_row.get_short(1).unwrap(),
col_smallint,
"col_smallint mismatch"
);
- assert_eq!(found_row.get_int(2), col_int, "col_int mismatch");
- assert_eq!(found_row.get_long(3), col_bigint, "col_bigint mismatch");
+ assert_eq!(found_row.get_int(2).unwrap(), col_int, "col_int mismatch");
+ assert_eq!(
+ found_row.get_long(3).unwrap(),
+ col_bigint,
+ "col_bigint mismatch"
+ );
assert!(
- (found_row.get_float(4) - col_float).abs() < f32::EPSILON,
+ (found_row.get_float(4).unwrap() - col_float).abs() < f32::EPSILON,
"col_float mismatch: expected {}, got {}",
col_float,
- found_row.get_float(4)
+ found_row.get_float(4).unwrap()
);
assert!(
- (found_row.get_double(5) - col_double).abs() < f64::EPSILON,
+ (found_row.get_double(5).unwrap() - col_double).abs() <
f64::EPSILON,
"col_double mismatch: expected {}, got {}",
col_double,
- found_row.get_double(5)
+ found_row.get_double(5).unwrap()
);
assert_eq!(
- found_row.get_boolean(6),
+ found_row.get_boolean(6).unwrap(),
col_boolean,
"col_boolean mismatch"
);
- assert_eq!(found_row.get_char(7, 10), col_char, "col_char mismatch");
- assert_eq!(found_row.get_string(8), col_string, "col_string mismatch");
assert_eq!(
- found_row.get_decimal(9, 10, 2),
+ found_row.get_char(7, 10).unwrap(),
+ col_char,
+ "col_char mismatch"
+ );
+ assert_eq!(
+ found_row.get_string(8).unwrap(),
+ col_string,
+ "col_string mismatch"
+ );
+ assert_eq!(
+ found_row.get_decimal(9, 10, 2).unwrap(),
col_decimal,
"col_decimal mismatch"
);
assert_eq!(
- found_row.get_date(10).get_inner(),
+ found_row.get_date(10).unwrap().get_inner(),
col_date.get_inner(),
"col_date mismatch"
);
assert_eq!(
- found_row.get_time(11).get_inner(),
+ found_row.get_time(11).unwrap().get_inner(),
col_time_s.get_inner(),
"col_time_s mismatch"
);
assert_eq!(
- found_row.get_time(12).get_inner(),
+ found_row.get_time(12).unwrap().get_inner(),
col_time_ms.get_inner(),
"col_time_ms mismatch"
);
assert_eq!(
- found_row.get_time(13).get_inner(),
+ found_row.get_time(13).unwrap().get_inner(),
col_time_us.get_inner(),
"col_time_us mismatch"
);
assert_eq!(
- found_row.get_time(14).get_inner(),
+ found_row.get_time(14).unwrap().get_inner(),
col_time_ns.get_inner(),
"col_time_ns mismatch"
);
assert_eq!(
- found_row.get_timestamp_ntz(15, 0).get_millisecond(),
+ found_row
+ .get_timestamp_ntz(15, 0)
+ .unwrap()
+ .get_millisecond(),
col_timestamp_s.get_millisecond(),
"col_timestamp_s mismatch"
);
assert_eq!(
- found_row.get_timestamp_ntz(16, 3).get_millisecond(),
+ found_row
+ .get_timestamp_ntz(16, 3)
+ .unwrap()
+ .get_millisecond(),
col_timestamp_ms.get_millisecond(),
"col_timestamp_ms mismatch"
);
- let read_ts_us = found_row.get_timestamp_ntz(17, 6);
+ let read_ts_us = found_row.get_timestamp_ntz(17, 6).unwrap();
assert_eq!(
read_ts_us.get_millisecond(),
col_timestamp_us.get_millisecond(),
@@ -863,7 +888,7 @@ mod table_test {
"col_timestamp_us nanos mismatch"
);
- let read_ts_ns = found_row.get_timestamp_ntz(18, 9);
+ let read_ts_ns = found_row.get_timestamp_ntz(18, 9).unwrap();
assert_eq!(
read_ts_ns.get_millisecond(),
col_timestamp_ns.get_millisecond(),
@@ -876,18 +901,24 @@ mod table_test {
);
assert_eq!(
- found_row.get_timestamp_ltz(19, 0).get_epoch_millisecond(),
+ found_row
+ .get_timestamp_ltz(19, 0)
+ .unwrap()
+ .get_epoch_millisecond(),
col_timestamp_ltz_s.get_epoch_millisecond(),
"col_timestamp_ltz_s mismatch"
);
assert_eq!(
- found_row.get_timestamp_ltz(20, 3).get_epoch_millisecond(),
+ found_row
+ .get_timestamp_ltz(20, 3)
+ .unwrap()
+ .get_epoch_millisecond(),
col_timestamp_ltz_ms.get_epoch_millisecond(),
"col_timestamp_ltz_ms mismatch"
);
- let read_ts_ltz_us = found_row.get_timestamp_ltz(21, 6);
+ let read_ts_ltz_us = found_row.get_timestamp_ltz(21, 6).unwrap();
assert_eq!(
read_ts_ltz_us.get_epoch_millisecond(),
col_timestamp_ltz_us.get_epoch_millisecond(),
@@ -899,7 +930,7 @@ mod table_test {
"col_timestamp_ltz_us nanos mismatch"
);
- let read_ts_ltz_ns = found_row.get_timestamp_ltz(22, 9);
+ let read_ts_ltz_ns = found_row.get_timestamp_ltz(22, 9).unwrap();
assert_eq!(
read_ts_ltz_ns.get_epoch_millisecond(),
col_timestamp_ltz_ns.get_epoch_millisecond(),
@@ -910,15 +941,19 @@ mod table_test {
col_timestamp_ltz_ns.get_nano_of_millisecond(),
"col_timestamp_ltz_ns nanos mismatch"
);
- assert_eq!(found_row.get_bytes(23), col_bytes, "col_bytes mismatch");
assert_eq!(
- found_row.get_binary(24, 4),
+ found_row.get_bytes(23).unwrap(),
+ col_bytes,
+ "col_bytes mismatch"
+ );
+ assert_eq!(
+ found_row.get_binary(24, 4).unwrap(),
col_binary,
"col_binary mismatch"
);
// Verify timestamps before Unix epoch (negative timestamps)
- let read_ts_us_neg = found_row.get_timestamp_ntz(25, 6);
+ let read_ts_us_neg = found_row.get_timestamp_ntz(25, 6).unwrap();
assert_eq!(
read_ts_us_neg.get_millisecond(),
col_timestamp_us_neg.get_millisecond(),
@@ -930,7 +965,7 @@ mod table_test {
"col_timestamp_us_neg nanos mismatch"
);
- let read_ts_ns_neg = found_row.get_timestamp_ntz(26, 9);
+ let read_ts_ns_neg = found_row.get_timestamp_ntz(26, 9).unwrap();
assert_eq!(
read_ts_ns_neg.get_millisecond(),
col_timestamp_ns_neg.get_millisecond(),
@@ -942,7 +977,7 @@ mod table_test {
"col_timestamp_ns_neg nanos mismatch"
);
- let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(27, 6);
+ let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(27, 6).unwrap();
assert_eq!(
read_ts_ltz_us_neg.get_epoch_millisecond(),
col_timestamp_ltz_us_neg.get_epoch_millisecond(),
@@ -954,7 +989,7 @@ mod table_test {
"col_timestamp_ltz_us_neg nanos mismatch"
);
- let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(28, 9);
+ let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(28, 9).unwrap();
assert_eq!(
read_ts_ltz_ns_neg.get_epoch_millisecond(),
col_timestamp_ltz_ns_neg.get_epoch_millisecond(),
@@ -969,7 +1004,11 @@ mod table_test {
// Verify row with all nulls (record index 1)
let found_row_nulls = records[1].row();
for i in 0..field_count {
- assert!(found_row_nulls.is_null_at(i), "column {} should be null",
i);
+ assert!(
+ found_row_nulls.is_null_at(i).unwrap(),
+ "column {} should be null",
+ i
+ );
}
admin
@@ -1140,9 +1179,9 @@ mod table_test {
for rec in records {
let row = rec.row();
collected_records.push((
- row.get_int(0),
- row.get_string(1).to_string(),
- row.get_long(2),
+ row.get_int(0).unwrap(),
+ row.get_string(1).unwrap().to_string(),
+ row.get_long(2).unwrap(),
));
}
}
@@ -1196,9 +1235,9 @@ mod table_test {
for rec in records {
let row = rec.row();
records_after_unsubscribe.push((
- row.get_int(0),
- row.get_string(1).to_string(),
- row.get_long(2),
+ row.get_int(0).unwrap(),
+ row.get_string(1).unwrap().to_string(),
+ row.get_long(2).unwrap(),
));
}
}
@@ -1248,9 +1287,9 @@ mod table_test {
for rec in records {
let row = rec.row();
batch_collected.push((
- row.get_int(0),
- row.get_string(1).to_string(),
- row.get_long(2),
+ row.get_int(0).unwrap(),
+ row.get_string(1).unwrap().to_string(),
+ row.get_long(2).unwrap(),
));
}
}
@@ -1272,4 +1311,73 @@ mod table_test {
.await
.expect("Failed to drop table");
}
+
+ #[tokio::test]
+ async fn undersized_row_returns_error() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss", "test_log_undersized_row");
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("col_bool", DataTypes::boolean())
+ .column("col_int", DataTypes::int())
+ .column("col_string", DataTypes::string())
+ .column("col_bigint", DataTypes::bigint())
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let append_writer = table
+ .new_append()
+ .expect("Failed to create table append")
+ .create_writer()
+ .expect("Failed to create writer");
+
+ // Scenario 1b: GenericRow with only 2 fields for a 4-column table
+ let mut row = fluss::row::GenericRow::new(2);
+ row.set_field(0, true);
+ row.set_field(1, 42_i32);
+
+ let result = append_writer.append(&row);
+ assert!(result.is_err(), "Undersized row should be rejected");
+ let err_msg = result.unwrap_err().to_string();
+ assert!(
+ err_msg.contains("Expected: 4") && err_msg.contains("Actual: 2"),
+ "Error should mention field count mismatch, got: {err_msg}"
+ );
+
+ // Correct column count but wrong types:
+ // Schema is (Boolean, Int, String, BigInt) but we put Int64 where
String is expected.
+ // This should return an error, not panic.
+ let row_wrong_types = fluss::row::GenericRow::from_data(vec![
+ fluss::row::Datum::Bool(true),
+ fluss::row::Datum::Int32(42),
+ fluss::row::Datum::Int64(999), // wrong: String column
+ fluss::row::Datum::Int64(100),
+ ]);
+
+ let result = append_writer.append(&row_wrong_types);
+ assert!(
+ result.is_err(),
+ "Row with mismatched types should be rejected, not panic"
+ );
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
}
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index 210dfc4..fcd6773 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -192,8 +192,18 @@ mod table_remote_scan_test {
let row = record.row();
let expected_c1 = i as i32;
let expected_c2 = format!("v{}", i);
- assert_eq!(row.get_int(1), expected_c1, "c1 mismatch at index {}",
i);
- assert_eq!(row.get_string(0), expected_c2, "c2 mismatch at index
{}", i);
+ assert_eq!(
+ row.get_int(1).unwrap(),
+ expected_c1,
+ "c1 mismatch at index {}",
+ i
+ );
+ assert_eq!(
+ row.get_string(0).unwrap(),
+ expected_c2,
+ "c2 mismatch at index {}",
+ i
+ );
}
}