This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 017bb15 chore: introduce new with capacity to align generic row with
java side (#212)
017bb15 is described below
commit 017bb153ca29b79cd2ee9875daa0cf8beab5bdfe
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Jan 25 21:37:20 2026 +0800
chore: introduce new with capacity to align generic row with java side
(#212)
---
bindings/cpp/src/types.rs | 2 +-
bindings/python/src/table.rs | 20 +++--
crates/examples/src/example_kv_table.rs | 13 ++--
crates/examples/src/example_table.rs | 4 +-
crates/fluss/src/client/table/log_fetch_buffer.rs | 4 +-
crates/fluss/src/client/table/remote_log.rs | 15 ++--
crates/fluss/src/client/table/upsert.rs | 6 +-
crates/fluss/src/metadata/json_serde.rs | 12 +--
crates/fluss/src/metadata/partition.rs | 6 +-
crates/fluss/src/metadata/table.rs | 3 +-
crates/fluss/src/record/arrow.rs | 36 +++------
crates/fluss/src/row/column.rs | 5 +-
.../src/row/compacted/compacted_row_reader.rs | 7 +-
crates/fluss/src/row/datum.rs | 48 ++++--------
crates/fluss/src/row/decimal.rs | 15 ++--
crates/fluss/src/row/mod.rs | 90 +++++++++++++++-------
crates/fluss/tests/integration/kv_table.rs | 34 ++++----
.../fluss/tests/integration/table_remote_scan.rs | 2 +-
18 files changed, 151 insertions(+), 171 deletions(-)
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index fef73ce..726e3d1 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -209,7 +209,7 @@ pub fn empty_table_info() -> ffi::FfiTableInfo {
pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_>
{
use fcore::row::Datum;
- let mut generic_row = fcore::row::GenericRow::new();
+ let mut generic_row = fcore::row::GenericRow::new(row.fields.len());
for (idx, field) in row.fields.iter().enumerate() {
let datum = match field.datum_type {
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index b56a29d..0ae7186 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -259,13 +259,13 @@ impl AppendWriter {
// Get the expected Arrow schema from the Fluss table
let row_type = self.table_info.get_row_type();
let expected_schema = fcore::record::to_arrow_schema(row_type)
- .map_err(|e| FlussError::new_err(format!("Failed to get table
schema: {}", e)))?;
+ .map_err(|e| FlussError::new_err(format!("Failed to get table
schema: {e}")))?;
// Convert Arrow schema to PyArrow schema
let py_schema = expected_schema
.as_ref()
.to_pyarrow(py)
- .map_err(|e| FlussError::new_err(format!("Failed to convert
schema: {}", e)))?;
+ .map_err(|e| FlussError::new_err(format!("Failed to convert
schema: {e}")))?;
// Import pyarrow module
let pyarrow = py.import("pyarrow")?;
@@ -570,13 +570,12 @@ fn python_decimal_to_datum(
let decimal_str: String = value.str()?.extract()?;
let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
- FlussError::new_err(format!("Failed to parse decimal '{}': {}",
decimal_str, e))
+ FlussError::new_err(format!("Failed to parse decimal '{decimal_str}':
{e}"))
})?;
let decimal = fcore::row::Decimal::from_big_decimal(bd, precision,
scale).map_err(|e| {
FlussError::new_err(format!(
- "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
- decimal_str, precision, scale, e
+ "Failed to convert decimal '{decimal_str}' to DECIMAL({precision},
{scale}): {e}"
))
})?;
@@ -641,10 +640,9 @@ fn python_time_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'sta
if microsecond % MICROS_PER_MILLI as i32 != 0 {
return Err(FlussError::new_err(format!(
"TIME values with sub-millisecond precision are not supported. \
- Got time with {} microseconds (not divisible by 1000). \
+ Got time with {microsecond} microseconds (not divisible by 1000).
\
Fluss stores TIME as milliseconds since midnight. \
- Please round to milliseconds before insertion.",
- microsecond
+ Please round to milliseconds before insertion."
)));
}
@@ -663,7 +661,7 @@ fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>)
-> PyResult<fcore::row
let (epoch_millis, nano_of_milli) =
extract_datetime_components_ntz(value)?;
let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis,
nano_of_milli)
- .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampNtz: {}", e)))?;
+ .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampNtz: {e}")))?;
Ok(fcore::row::Datum::TimestampNtz(ts))
}
@@ -675,7 +673,7 @@ fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>)
-> PyResult<fcore::row
let (epoch_millis, nano_of_milli) =
extract_datetime_components_ltz(value)?;
let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis,
nano_of_milli)
- .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampLtz: {}", e)))?;
+ .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampLtz: {e}")))?;
Ok(fcore::row::Datum::TimestampLtz(ts))
}
@@ -803,7 +801,7 @@ fn datetime_to_epoch_millis_as_utc(
let timestamp = jiff::tz::Offset::UTC
.to_timestamp(civil_dt)
- .map_err(|e| FlussError::new_err(format!("Invalid datetime: {}", e)))?;
+ .map_err(|e| FlussError::new_err(format!("Invalid datetime: {e}")))?;
let millis = timestamp.as_millisecond();
let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as
i32) as i32;
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index dcf7db8..032691e 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -58,7 +58,7 @@ pub async fn main() -> Result<()> {
println!("\n=== Upserting ===");
for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3,
"Esquie", 35)] {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(3);
row.set_field(0, id);
row.set_field(1, name);
row.set_field(2, age);
@@ -80,7 +80,7 @@ pub async fn main() -> Result<()> {
}
println!("\n=== Updating ===");
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(3);
row.set_field(0, 1);
row.set_field(1, "Verso");
row.set_field(2, 33i64);
@@ -96,12 +96,11 @@ pub async fn main() -> Result<()> {
);
println!("\n=== Deleting ===");
- let mut row = GenericRow::new();
+ // For delete, only primary key field needs to be set; other fields can
remain null
+ let mut row = GenericRow::new(3);
row.set_field(0, 2);
- row.set_field(1, "");
- row.set_field(2, 0i64);
upsert_writer.delete(&row).await?;
- println!("Deleted: {row:?}");
+ println!("Deleted row with id=2");
let result = lookuper.lookup(&make_key(2)).await?;
if result.get_single_row()?.is_none() {
@@ -112,7 +111,7 @@ pub async fn main() -> Result<()> {
}
fn make_key(id: i32) -> GenericRow<'static> {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(1);
row.set_field(0, id);
row
}
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index 7333056..ca6b942 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -56,7 +56,7 @@ pub async fn main() -> Result<()> {
print!("Get created table:\n {table_info}\n");
// write row
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(3);
row.set_field(0, 22222);
row.set_field(1, "t2t");
row.set_field(2, 123_456_789_123i64);
@@ -64,7 +64,7 @@ pub async fn main() -> Result<()> {
let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer();
let f1 = append_writer.append(row);
- row = GenericRow::new();
+ row = GenericRow::new(3);
row.set_field(0, 233333);
row.set_field(1, "tt44");
row.set_field(2, 987_654_321_987i64);
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index b529806..7ece34b 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -800,7 +800,7 @@ impl PendingFetch for RemotePendingFetch {
let pos = self.pos_in_log_segment as usize;
if pos >= file_size {
return Err(Error::UnexpectedError {
- message: format!("Position {} exceeds file size {}", pos,
file_size),
+ message: format!("Position {pos} exceeds file size
{file_size}"),
source: None,
});
}
@@ -911,7 +911,7 @@ mod tests {
},
)?;
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "alice");
let record = WriteRecord::for_append(table_path, 1, row);
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index c39056d..a2e19d4 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -469,7 +469,7 @@ async fn spawn_download_task(
result_sender: request.result_sender,
}
}
- Err(e) if request.result_sender.is_closed() => {
+ Err(_e) if request.result_sender.is_closed() => {
// Receiver dropped (cancelled) - release permit, don't re-queue
drop(permit);
DownloadResult::Cancelled
@@ -491,8 +491,7 @@ async fn spawn_download_task(
DownloadResult::FailedPermanently {
error: Error::UnexpectedError {
message: format!(
- "Failed to download remote log segment after {}
retries: {}",
- retry_count, e
+ "Failed to download remote log segment after
{retry_count} retries: {e}"
),
source: Some(Box::new(e)),
},
@@ -585,7 +584,7 @@ async fn coordinator_loop(
// Cancelled - permit already released, nothing to do
}
Err(e) => {
- log::error!("Download task panicked: {:?}", e);
+ log::error!("Download task panicked: {e:?}");
// Permit already released via RAII
}
}
@@ -1001,7 +1000,7 @@ mod tests {
if should_fail {
Err(Error::UnexpectedError {
- message: format!("Fake fetch failed for {}",
segment_id),
+ message: format!("Fake fetch failed for {segment_id}"),
source: None,
})
} else {
@@ -1012,7 +1011,7 @@ mod tests {
.unwrap()
.as_nanos();
let file_path =
- temp_dir.join(format!("fake_segment_{}_{}.log",
segment_id, timestamp));
+
temp_dir.join(format!("fake_segment_{segment_id}_{timestamp}.log"));
tokio::fs::write(&file_path, &fake_data).await?;
Ok(FetchResult {
@@ -1121,7 +1120,7 @@ mod tests {
// Request 4 segments with same priority (to isolate concurrency
limiting from priority)
let segs: Vec<_> = (0..4)
- .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000,
bucket.clone()))
+ .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000,
bucket.clone()))
.collect();
let _futures: Vec<_> = segs
@@ -1168,7 +1167,7 @@ mod tests {
// Request 4 downloads
let segs: Vec<_> = (0..4)
- .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000,
bucket.clone()))
+ .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000,
bucket.clone()))
.collect();
let mut futures: Vec<_> = segs
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index a3909e7..984592d 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -232,8 +232,7 @@ impl UpsertWriterFactory {
None => {
return Err(IllegalArgument {
message: format!(
- "The specified primary key {} is not in row type
{}",
- primary_key, row_type
+ "The specified primary key {primary_key} is not in
row type {row_type}"
),
});
}
@@ -250,8 +249,7 @@ impl UpsertWriterFactory {
if target_column_set[index] {
return Err(IllegalArgument {
message: format!(
- "Explicitly specifying values for the auto
increment column {} is not allowed.",
- auto_increment_col_name
+ "Explicitly specifying values for the auto
increment column {auto_increment_col_name} is not allowed."
),
});
}
diff --git a/crates/fluss/src/metadata/json_serde.rs
b/crates/fluss/src/metadata/json_serde.rs
index faa5583..d0d56ef 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -205,7 +205,7 @@ impl JsonSerde for DataType {
DataType::Decimal(
crate::metadata::datatype::DecimalType::with_nullable(true, precision, scale)
.map_err(|e| Error::JsonSerdeError {
- message: format!("Invalid DECIMAL parameters: {}", e),
+ message: format!("Invalid DECIMAL parameters: {e}"),
})?,
)
}
@@ -218,7 +218,7 @@ impl JsonSerde for DataType {
DataType::Time(
crate::metadata::datatype::TimeType::with_nullable(true,
precision).map_err(
|e| Error::JsonSerdeError {
- message: format!("Invalid TIME_WITHOUT_TIME_ZONE
precision: {}", e),
+ message: format!("Invalid TIME_WITHOUT_TIME_ZONE
precision: {e}"),
},
)?,
)
@@ -231,10 +231,7 @@ impl JsonSerde for DataType {
DataType::Timestamp(
crate::metadata::datatype::TimestampType::with_nullable(true, precision)
.map_err(|e| Error::JsonSerdeError {
- message: format!(
- "Invalid TIMESTAMP_WITHOUT_TIME_ZONE
precision: {}",
- e
- ),
+ message: format!("Invalid
TIMESTAMP_WITHOUT_TIME_ZONE precision: {e}"),
})?,
)
}
@@ -247,8 +244,7 @@ impl JsonSerde for DataType {
crate::metadata::datatype::TimestampLTzType::with_nullable(true, precision)
.map_err(|e| Error::JsonSerdeError {
message: format!(
- "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE
precision: {}",
- e
+ "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE
precision: {e}"
),
})?,
)
diff --git a/crates/fluss/src/metadata/partition.rs
b/crates/fluss/src/metadata/partition.rs
index 1ecc0dc..e40fbf9 100644
--- a/crates/fluss/src/metadata/partition.rs
+++ b/crates/fluss/src/metadata/partition.rs
@@ -131,8 +131,7 @@ impl ResolvedPartitionSpec {
if parts.len() != 2 {
return Err(Error::IllegalArgument {
message: format!(
- "Invalid partition name format. Expected key=value,
got: {}",
- pair
+ "Invalid partition name format. Expected key=value,
got: {pair}"
),
});
}
@@ -199,8 +198,7 @@ impl ResolvedPartitionSpec {
None => {
return Err(Error::IllegalArgument {
message: format!(
- "table does not contain partitionKey: {}",
- other_partition_key
+ "table does not contain partitionKey:
{other_partition_key}"
),
});
}
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index c4a9195..3b9da7d 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -227,8 +227,7 @@ impl SchemaBuilder {
if !column_names.contains(auto_inc_col) {
return Err(IllegalArgument {
message: format!(
- "Auto increment column '{}' is not found in the schema
columns.",
- auto_inc_col
+ "Auto increment column '{auto_inc_col}' is not found
in the schema columns."
),
});
}
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 4bfdc71..63df6de 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -107,7 +107,7 @@ fn validate_batch_size(batch_size_bytes: i32) ->
Result<usize> {
// Check for negative size (corrupted data)
if batch_size_bytes < 0 {
return Err(Error::UnexpectedError {
- message: format!("Invalid negative batch size: {}",
batch_size_bytes),
+ message: format!("Invalid negative batch size:
{batch_size_bytes}"),
source: None,
});
}
@@ -120,8 +120,7 @@ fn validate_batch_size(batch_size_bytes: i32) ->
Result<usize> {
.checked_add(LOG_OVERHEAD)
.ok_or_else(|| Error::UnexpectedError {
message: format!(
- "Batch size {} + LOG_OVERHEAD {} would overflow",
- batch_size_u, LOG_OVERHEAD
+ "Batch size {batch_size_u} + LOG_OVERHEAD {LOG_OVERHEAD}
would overflow"
),
source: None,
})?;
@@ -130,8 +129,7 @@ fn validate_batch_size(batch_size_bytes: i32) ->
Result<usize> {
if total_size > MAX_BATCH_SIZE {
return Err(Error::UnexpectedError {
message: format!(
- "Batch size {} exceeds maximum allowed size {}",
- total_size, MAX_BATCH_SIZE
+ "Batch size {total_size} exceeds maximum allowed size
{MAX_BATCH_SIZE}"
),
source: None,
});
@@ -259,8 +257,7 @@ impl RowAppendRecordBatchBuilder {
.with_precision_and_scale(*precision, *scale)
.map_err(|e| Error::IllegalArgument {
message: format!(
- "Invalid decimal precision {} or scale {}: {}",
- precision, scale, e
+ "Invalid decimal precision {precision} or scale
{scale}: {e}"
),
})?;
Ok(Box::new(builder))
@@ -273,8 +270,7 @@ impl RowAppendRecordBatchBuilder {
}
_ => Err(Error::IllegalArgument {
message: format!(
- "Time32 only supports Second and Millisecond units,
got: {:?}",
- unit
+ "Time32 only supports Second and Millisecond units,
got: {unit:?}"
),
}),
},
@@ -285,8 +281,7 @@ impl RowAppendRecordBatchBuilder {
arrow_schema::TimeUnit::Nanosecond =>
Ok(Box::new(Time64NanosecondBuilder::new())),
_ => Err(Error::IllegalArgument {
message: format!(
- "Time64 only supports Microsecond and Nanosecond
units, got: {:?}",
- unit
+ "Time64 only supports Microsecond and Nanosecond
units, got: {unit:?}"
),
}),
},
@@ -592,10 +587,7 @@ impl FileSource {
// Validate base_offset to prevent underflow in total_size()
if base_offset > file_size {
return Err(Error::UnexpectedError {
- message: format!(
- "base_offset ({}) exceeds file_size ({})",
- base_offset, file_size
- ),
+ message: format!("base_offset ({base_offset}) exceeds
file_size ({file_size})"),
source: None,
});
}
@@ -1044,7 +1036,7 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
Result<ArrowDataType> {
7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond),
invalid => {
return Err(Error::IllegalArgument {
- message: format!("Invalid precision {} for TimeType (must
be 0-9)", invalid),
+ message: format!("Invalid precision {invalid} for TimeType
(must be 0-9)"),
});
}
},
@@ -1055,10 +1047,7 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
Result<ArrowDataType> {
7..=9 =>
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
invalid => {
return Err(Error::IllegalArgument {
- message: format!(
- "Invalid precision {} for TimestampType (must be 0-9)",
- invalid
- ),
+ message: format!("Invalid precision {invalid} for
TimestampType (must be 0-9)"),
});
}
},
@@ -1070,8 +1059,7 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
Result<ArrowDataType> {
invalid => {
return Err(Error::IllegalArgument {
message: format!(
- "Invalid precision {} for TimestampLTzType (must be
0-9)",
- invalid
+ "Invalid precision {invalid} for TimestampLTzType
(must be 0-9)"
),
});
}
@@ -1939,13 +1927,13 @@ mod tests {
},
)?;
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "alice");
let record = WriteRecord::for_append(table_path.clone(), 1, row);
builder.append(&record)?;
- let mut row2 = GenericRow::new();
+ let mut row2 = GenericRow::new(2);
row2.set_field(0, 2_i32);
row2.set_field(1, "bob");
let record2 = WriteRecord::for_append(table_path, 2, row2);
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 615e038..46c25b2 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -199,10 +199,7 @@ impl InternalRow for ColumnarRow {
let field = schema.field(pos);
let arrow_scale = match field.data_type() {
DataType::Decimal128(_p, s) => *s as i64,
- dt => panic!(
- "Expected Decimal128 data type at column {}, found: {:?}",
- pos, dt
- ),
+ dt => panic!("Expected Decimal128 data type at column {pos},
found: {dt:?}"),
};
let i128_val = array.value(self.row_id);
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 40470db..00e53aa 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -50,7 +50,7 @@ impl<'a> CompactedRowDeserializer<'a> {
}
pub fn deserialize(&self, reader: &CompactedRowReader<'a>) ->
GenericRow<'a> {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(self.row_type.fields().len());
let mut cursor = reader.initial_position();
for (col_pos, data_field) in self.row_type.fields().iter().enumerate()
{
let dtype = &data_field.data_type;
@@ -161,10 +161,7 @@ impl<'a> CompactedRowDeserializer<'a> {
}
}
_ => {
- panic!(
- "Unsupported DataType in CompactedRowDeserializer:
{:?}",
- dtype
- );
+ panic!("Unsupported DataType in CompactedRowDeserializer:
{dtype:?}");
}
};
cursor = next_cursor;
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 7b3850f..b808373 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -407,8 +407,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) ->
Result<i64> {
.checked_mul(MICROS_PER_MILLI)
.ok_or_else(|| RowConvertError {
message: format!(
- "Timestamp milliseconds {} overflows when converting to
microseconds",
- millis
+ "Timestamp milliseconds {millis} overflows when converting to
microseconds"
),
})?;
let nanos_micros = (nanos as i64) / MICROS_PER_MILLI;
@@ -416,8 +415,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) ->
Result<i64> {
.checked_add(nanos_micros)
.ok_or_else(|| RowConvertError {
message: format!(
- "Timestamp overflow when adding microseconds: {} + {}",
- millis_micros, nanos_micros
+ "Timestamp overflow when adding microseconds: {millis_micros}
+ {nanos_micros}"
),
})
}
@@ -429,16 +427,14 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) ->
Result<i64> {
.checked_mul(NANOS_PER_MILLI)
.ok_or_else(|| RowConvertError {
message: format!(
- "Timestamp milliseconds {} overflows when converting to
nanoseconds",
- millis
+ "Timestamp milliseconds {millis} overflows when converting to
nanoseconds"
),
})?;
millis_nanos
.checked_add(nanos as i64)
.ok_or_else(|| RowConvertError {
message: format!(
- "Timestamp overflow when adding nanoseconds: {} + {}",
- millis_nanos, nanos
+ "Timestamp overflow when adding nanoseconds: {millis_nanos} +
{nanos}"
),
})
}
@@ -504,10 +500,7 @@ impl Datum<'_> {
arrow_schema::DataType::Decimal128(p, s) => (*p, *s),
_ => {
return Err(RowConvertError {
- message: format!(
- "Expected Decimal128 Arrow type, got: {:?}",
- data_type
- ),
+ message: format!("Expected Decimal128 Arrow type,
got: {data_type:?}"),
});
}
};
@@ -515,7 +508,7 @@ impl Datum<'_> {
// Validate scale is non-negative (Fluss doesn't support
negative scales)
if s < 0 {
return Err(RowConvertError {
- message: format!("Negative decimal scale {} is not
supported", s),
+ message: format!("Negative decimal scale {s} is not
supported"),
});
}
@@ -535,8 +528,7 @@ impl Datum<'_> {
if actual_precision > target_precision as usize {
return Err(RowConvertError {
message: format!(
- "Decimal precision overflow: value has {}
digits but Arrow expects {} (value: {})",
- actual_precision, target_precision, rescaled
+ "Decimal precision overflow: value has
{actual_precision} digits but Arrow expects {target_precision} (value:
{rescaled})"
),
});
}
@@ -546,7 +538,7 @@ impl Datum<'_> {
Ok(v) => v,
Err(_) => {
return Err(RowConvertError {
- message: format!("Decimal value exceeds i128
range: {}", rescaled),
+ message: format!("Decimal value exceeds i128
range: {rescaled}"),
});
}
};
@@ -575,8 +567,7 @@ impl Datum<'_> {
if millis % MILLIS_PER_SECOND as i32 != 0 {
return Err(RowConvertError {
message: format!(
- "Time value {} ms has sub-second
precision but schema expects seconds only",
- millis
+ "Time value {millis} ms has sub-second
precision but schema expects seconds only"
),
});
}
@@ -602,8 +593,7 @@ impl Datum<'_> {
.checked_mul(MICROS_PER_MILLI)
.ok_or_else(|| RowConvertError {
message: format!(
- "Time value {} ms overflows when
converting to microseconds",
- millis
+ "Time value {millis} ms overflows when
converting to microseconds"
),
})?;
b.append_value(micros);
@@ -618,8 +608,7 @@ impl Datum<'_> {
let nanos = (millis as
i64).checked_mul(NANOS_PER_MILLI).ok_or_else(
|| RowConvertError {
message: format!(
- "Time value {} ms overflows when
converting to nanoseconds",
- millis
+ "Time value {millis} ms overflows when
converting to nanoseconds"
),
},
)?;
@@ -630,8 +619,7 @@ impl Datum<'_> {
_ => {
return Err(RowConvertError {
message: format!(
- "Expected Time32/Time64 Arrow type, got: {:?}",
- data_type
+ "Expected Time32/Time64 Arrow type, got:
{data_type:?}"
),
});
}
@@ -808,8 +796,7 @@ impl TimestampNtz {
if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
return Err(crate::error::Error::IllegalArgument {
message: format!(
- "nanoOfMillisecond must be in range [0, {}], got: {}",
- MAX_NANO_OF_MILLISECOND, nano_of_millisecond
+ "nanoOfMillisecond must be in range [0,
{MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}"
),
});
}
@@ -856,8 +843,7 @@ impl TimestampLtz {
if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
return Err(crate::error::Error::IllegalArgument {
message: format!(
- "nanoOfMillisecond must be in range [0, {}], got: {}",
- MAX_NANO_OF_MILLISECOND, nano_of_millisecond
+ "nanoOfMillisecond must be in range [0,
{MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}"
),
});
}
@@ -1030,10 +1016,8 @@ mod timestamp_tests {
#[test]
fn test_timestamp_nanos_out_of_range() {
// Test that both TimestampNtz and TimestampLtz reject invalid nanos
- let expected_msg = format!(
- "nanoOfMillisecond must be in range [0, {}]",
- MAX_NANO_OF_MILLISECOND
- );
+ let expected_msg =
+ format!("nanoOfMillisecond must be in range [0,
{MAX_NANO_OF_MILLISECOND}]");
// Too large (1,000,000 is just beyond the valid range)
let result_ntz = TimestampNtz::from_millis_nanos(1000,
MAX_NANO_OF_MILLISECOND + 1);
diff --git a/crates/fluss/src/row/decimal.rs b/crates/fluss/src/row/decimal.rs
index b14bde5..fd21b82 100644
--- a/crates/fluss/src/row/decimal.rs
+++ b/crates/fluss/src/row/decimal.rs
@@ -129,16 +129,14 @@ impl Decimal {
// Sanity check that scale matches
debug_assert_eq!(
exp, scale as i64,
- "Scaled decimal exponent ({}) != expected scale ({})",
- exp, scale
+ "Scaled decimal exponent ({exp}) != expected scale ({scale})"
);
let actual_precision = Self::compute_precision(&unscaled);
if actual_precision > precision as usize {
return Err(Error::IllegalArgument {
message: format!(
- "Decimal precision overflow: value has {} digits but
precision is {} (value: {})",
- actual_precision, precision, scaled
+ "Decimal precision overflow: value has {actual_precision}
digits but precision is {precision} (value: {scaled})"
),
});
}
@@ -147,8 +145,7 @@ impl Decimal {
let long_val = if precision <= MAX_COMPACT_PRECISION {
Some(i64::try_from(&unscaled).map_err(|_| Error::IllegalArgument {
message: format!(
- "Decimal mantissa exceeds i64 range for compact precision
{}: unscaled={} (value={})",
- precision, unscaled, scaled
+ "Decimal mantissa exceeds i64 range for compact precision
{precision}: unscaled={unscaled} (value={scaled})"
),
})?)
} else {
@@ -168,8 +165,7 @@ impl Decimal {
if precision > MAX_COMPACT_PRECISION {
return Err(Error::IllegalArgument {
message: format!(
- "Precision {} exceeds MAX_COMPACT_PRECISION ({})",
- precision, MAX_COMPACT_PRECISION
+ "Precision {precision} exceeds MAX_COMPACT_PRECISION
({MAX_COMPACT_PRECISION})"
),
});
}
@@ -178,8 +174,7 @@ impl Decimal {
if actual_precision > precision as usize {
return Err(Error::IllegalArgument {
message: format!(
- "Decimal precision overflow: unscaled value has {} digits
but precision is {}",
- actual_precision, precision
+ "Decimal precision overflow: unscaled value has
{actual_precision} digits but precision is {precision}"
),
});
}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 81a4254..f7c8bec 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -159,6 +159,23 @@ impl<'a> InternalRow for GenericRow<'a> {
self.values.get(_pos).unwrap().try_into().unwrap()
}
+ fn get_float(&self, pos: usize) -> f32 {
+ self.values.get(pos).unwrap().try_into().unwrap()
+ }
+
+ fn get_double(&self, pos: usize) -> f64 {
+ self.values.get(pos).unwrap().try_into().unwrap()
+ }
+
+ fn get_char(&self, pos: usize, _length: usize) -> &str {
+ // don't check length, following java client
+ self.get_string(pos)
+ }
+
+ fn get_string(&self, pos: usize) -> &str {
+ self.values.get(pos).unwrap().try_into().unwrap()
+ }
+
fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) ->
Decimal {
match self.values.get(pos).unwrap() {
Datum::Decimal(d) => d.clone(),
@@ -196,23 +213,6 @@ impl<'a> InternalRow for GenericRow<'a> {
}
}
- fn get_float(&self, pos: usize) -> f32 {
- self.values.get(pos).unwrap().try_into().unwrap()
- }
-
- fn get_double(&self, pos: usize) -> f64 {
- self.values.get(pos).unwrap().try_into().unwrap()
- }
-
- fn get_char(&self, pos: usize, _length: usize) -> &str {
- // don't check length, following java client
- self.get_string(pos)
- }
-
- fn get_string(&self, pos: usize) -> &str {
- self.values.get(pos).unwrap().try_into().unwrap()
- }
-
fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
self.values.get(pos).unwrap().as_blob()
}
@@ -222,24 +222,39 @@ impl<'a> InternalRow for GenericRow<'a> {
}
}
-impl<'a> Default for GenericRow<'a> {
- fn default() -> Self {
- Self::new()
- }
-}
-
impl<'a> GenericRow<'a> {
pub fn from_data(data: Vec<impl Into<Datum<'a>>>) -> GenericRow<'a> {
GenericRow {
values: data.into_iter().map(Into::into).collect(),
}
}
- pub fn new() -> GenericRow<'a> {
- GenericRow { values: vec![] }
+
+ /// Creates a GenericRow with the specified number of fields, all
initialized to null.
+ ///
+ /// This is useful when you need to create a row with a specific field
count
+ /// but only want to set some fields (e.g., for KV delete operations where
+ /// only primary key fields need to be set).
+ ///
+ /// # Example
+ /// ```
+ /// use fluss::row::GenericRow;
+ ///
+ /// let mut row = GenericRow::new(3);
+ /// row.set_field(0, 42); // Only set the primary key
+ /// // Fields 1 and 2 remain null
+ /// ```
+ pub fn new(field_count: usize) -> GenericRow<'a> {
+ GenericRow {
+ values: vec![Datum::Null; field_count],
+ }
}
+ /// Sets the field at the given position to the specified value.
+ ///
+ /// # Panics
+ /// Panics if `pos` is out of bounds (>= field count).
pub fn set_field(&mut self, pos: usize, value: impl Into<Datum<'a>>) {
- self.values.insert(pos, value.into());
+ self.values[pos] = value.into();
}
}
@@ -249,11 +264,32 @@ mod tests {
#[test]
fn is_null_at_checks_datum_nullity() {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(2);
row.set_field(0, Datum::Null);
row.set_field(1, 42_i32);
assert!(row.is_null_at(0));
assert!(!row.is_null_at(1));
}
+
+ #[test]
+ fn new_initializes_nulls() {
+ let row = GenericRow::new(3);
+ assert_eq!(row.get_field_count(), 3);
+ assert!(row.is_null_at(0));
+ assert!(row.is_null_at(1));
+ assert!(row.is_null_at(2));
+ }
+
+ #[test]
+ fn partial_row_for_delete() {
+ // Simulates delete scenario: only primary key (field 0) is set
+ let mut row = GenericRow::new(3);
+ row.set_field(0, 123_i32);
+ // Fields 1 and 2 remain null
+ assert_eq!(row.get_field_count(), 3);
+ assert_eq!(row.get_int(0), 123);
+ assert!(row.is_null_at(1));
+ assert!(row.is_null_at(2));
+ }
}
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index 3f46f9f..a4f2961 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -53,10 +53,8 @@ mod kv_table_test {
}
fn make_key(id: i32) -> GenericRow<'static> {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(3);
row.set_field(0, id);
- row.set_field(1, "");
- row.set_field(2, 0i64);
row
}
@@ -98,7 +96,7 @@ mod kv_table_test {
// Upsert rows
for (id, name, age) in &test_data {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(3);
row.set_field(0, *id);
row.set_field(1, *name);
row.set_field(2, *age);
@@ -132,7 +130,7 @@ mod kv_table_test {
}
// Update the record with new age
- let mut updated_row = GenericRow::new();
+ let mut updated_row = GenericRow::new(3);
updated_row.set_field(0, 1);
updated_row.set_field(1, "Verso");
updated_row.set_field(2, 33i64);
@@ -162,10 +160,8 @@ mod kv_table_test {
);
// Delete record with id=1
- let mut delete_row = GenericRow::new();
+ let mut delete_row = GenericRow::new(3);
delete_row.set_field(0, 1);
- delete_row.set_field(1, "");
- delete_row.set_field(2, 0i64);
upsert_writer
.delete(&delete_row)
.await
@@ -262,7 +258,7 @@ mod kv_table_test {
];
for (region, user_id, score) in &test_data {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(3);
row.set_field(0, *region);
row.set_field(1, *user_id);
row.set_field(2, *score);
@@ -277,7 +273,7 @@ mod kv_table_test {
.expect("Failed to create lookuper");
// Lookup (US, 1) - should return score 100
- let mut key = GenericRow::new();
+ let mut key = GenericRow::new(3);
key.set_field(0, "US");
key.set_field(1, 1);
let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -288,7 +284,7 @@ mod kv_table_test {
assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100");
// Lookup (EU, 2) - should return score 250
- let mut key = GenericRow::new();
+ let mut key = GenericRow::new(3);
key.set_field(0, "EU");
key.set_field(1, 2);
let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -299,7 +295,7 @@ mod kv_table_test {
assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
// Update (US, 1) score
- let mut update_row = GenericRow::new();
+ let mut update_row = GenericRow::new(3);
update_row.set_field(0, "US");
update_row.set_field(1, 1);
update_row.set_field(2, 500i64);
@@ -309,7 +305,7 @@ mod kv_table_test {
.expect("Failed to update");
// Verify update
- let mut key = GenericRow::new();
+ let mut key = GenericRow::new(3);
key.set_field(0, "US");
key.set_field(1, 1);
let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -367,7 +363,7 @@ mod kv_table_test {
.create_writer()
.expect("Failed to create writer");
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(4);
row.set_field(0, 1);
row.set_field(1, "Verso");
row.set_field(2, 32i64);
@@ -407,7 +403,7 @@ mod kv_table_test {
.expect("Failed to create UpsertWriter with partial write");
// Update only the score column
- let mut partial_row = GenericRow::new();
+ let mut partial_row = GenericRow::new(4);
partial_row.set_field(0, 1);
partial_row.set_field(1, Datum::Null); // not in partial update column
partial_row.set_field(2, Datum::Null); // not in partial update column
@@ -522,7 +518,7 @@ mod kv_table_test {
let col_binary: &[u8] = b"fixed binary data!!!";
// Upsert a row with all datatypes
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(17);
row.set_field(0, pk_int);
row.set_field(1, col_boolean);
row.set_field(2, col_tinyint);
@@ -553,7 +549,7 @@ mod kv_table_test {
.create_lookuper()
.expect("Failed to create lookuper");
- let mut key = GenericRow::new();
+ let mut key = GenericRow::new(17);
key.set_field(0, pk_int);
let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -625,7 +621,7 @@ mod kv_table_test {
// Test with null values for nullable columns
let pk_int_2 = 2i32;
- let mut row_with_nulls = GenericRow::new();
+ let mut row_with_nulls = GenericRow::new(17);
row_with_nulls.set_field(0, pk_int_2);
row_with_nulls.set_field(1, Datum::Null); // col_boolean
row_with_nulls.set_field(2, Datum::Null); // col_tinyint
@@ -650,7 +646,7 @@ mod kv_table_test {
.expect("Failed to upsert row with nulls");
// Lookup row with nulls
- let mut key2 = GenericRow::new();
+ let mut key2 = GenericRow::new(17);
key2.set_field(0, pk_int_2);
let result = lookuper.lookup(&key2).await.expect("Failed to lookup");
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index e28a836..c83da0f 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -146,7 +146,7 @@ mod table_remote_scan_test {
// append 20 rows, there must be some tiered to remote
let record_count = 20;
for i in 0..record_count {
- let mut row = GenericRow::new();
+ let mut row = GenericRow::new(2);
row.set_field(0, i as i32);
let v = format!("v{}", i);
row.set_field(1, v.as_str());