This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 017bb15  chore: introduce new with capacity to align generic row with 
java side (#212)
017bb15 is described below

commit 017bb153ca29b79cd2ee9875daa0cf8beab5bdfe
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Jan 25 21:37:20 2026 +0800

    chore: introduce new with capacity to align generic row with java side 
(#212)
---
 bindings/cpp/src/types.rs                          |  2 +-
 bindings/python/src/table.rs                       | 20 +++--
 crates/examples/src/example_kv_table.rs            | 13 ++--
 crates/examples/src/example_table.rs               |  4 +-
 crates/fluss/src/client/table/log_fetch_buffer.rs  |  4 +-
 crates/fluss/src/client/table/remote_log.rs        | 15 ++--
 crates/fluss/src/client/table/upsert.rs            |  6 +-
 crates/fluss/src/metadata/json_serde.rs            | 12 +--
 crates/fluss/src/metadata/partition.rs             |  6 +-
 crates/fluss/src/metadata/table.rs                 |  3 +-
 crates/fluss/src/record/arrow.rs                   | 36 +++------
 crates/fluss/src/row/column.rs                     |  5 +-
 .../src/row/compacted/compacted_row_reader.rs      |  7 +-
 crates/fluss/src/row/datum.rs                      | 48 ++++--------
 crates/fluss/src/row/decimal.rs                    | 15 ++--
 crates/fluss/src/row/mod.rs                        | 90 +++++++++++++++-------
 crates/fluss/tests/integration/kv_table.rs         | 34 ++++----
 .../fluss/tests/integration/table_remote_scan.rs   |  2 +-
 18 files changed, 151 insertions(+), 171 deletions(-)

diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index fef73ce..726e3d1 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -209,7 +209,7 @@ pub fn empty_table_info() -> ffi::FfiTableInfo {
 pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> 
{
     use fcore::row::Datum;
 
-    let mut generic_row = fcore::row::GenericRow::new();
+    let mut generic_row = fcore::row::GenericRow::new(row.fields.len());
 
     for (idx, field) in row.fields.iter().enumerate() {
         let datum = match field.datum_type {
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index b56a29d..0ae7186 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -259,13 +259,13 @@ impl AppendWriter {
         // Get the expected Arrow schema from the Fluss table
         let row_type = self.table_info.get_row_type();
         let expected_schema = fcore::record::to_arrow_schema(row_type)
-            .map_err(|e| FlussError::new_err(format!("Failed to get table 
schema: {}", e)))?;
+            .map_err(|e| FlussError::new_err(format!("Failed to get table 
schema: {e}")))?;
 
         // Convert Arrow schema to PyArrow schema
         let py_schema = expected_schema
             .as_ref()
             .to_pyarrow(py)
-            .map_err(|e| FlussError::new_err(format!("Failed to convert 
schema: {}", e)))?;
+            .map_err(|e| FlussError::new_err(format!("Failed to convert 
schema: {e}")))?;
 
         // Import pyarrow module
         let pyarrow = py.import("pyarrow")?;
@@ -570,13 +570,12 @@ fn python_decimal_to_datum(
 
     let decimal_str: String = value.str()?.extract()?;
     let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
-        FlussError::new_err(format!("Failed to parse decimal '{}': {}", 
decimal_str, e))
+        FlussError::new_err(format!("Failed to parse decimal '{decimal_str}': 
{e}"))
     })?;
 
     let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, 
scale).map_err(|e| {
         FlussError::new_err(format!(
-            "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
-            decimal_str, precision, scale, e
+            "Failed to convert decimal '{decimal_str}' to DECIMAL({precision}, 
{scale}): {e}"
         ))
     })?;
 
@@ -641,10 +640,9 @@ fn python_time_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'sta
     if microsecond % MICROS_PER_MILLI as i32 != 0 {
         return Err(FlussError::new_err(format!(
             "TIME values with sub-millisecond precision are not supported. \
-             Got time with {} microseconds (not divisible by 1000). \
+             Got time with {microsecond} microseconds (not divisible by 1000). 
\
              Fluss stores TIME as milliseconds since midnight. \
-             Please round to milliseconds before insertion.",
-            microsecond
+             Please round to milliseconds before insertion."
         )));
     }
 
@@ -663,7 +661,7 @@ fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) 
-> PyResult<fcore::row
     let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ntz(value)?;
 
     let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
-        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampNtz: {}", e)))?;
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampNtz: {e}")))?;
 
     Ok(fcore::row::Datum::TimestampNtz(ts))
 }
@@ -675,7 +673,7 @@ fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) 
-> PyResult<fcore::row
     let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ltz(value)?;
 
     let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
-        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampLtz: {}", e)))?;
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampLtz: {e}")))?;
 
     Ok(fcore::row::Datum::TimestampLtz(ts))
 }
@@ -803,7 +801,7 @@ fn datetime_to_epoch_millis_as_utc(
 
     let timestamp = jiff::tz::Offset::UTC
         .to_timestamp(civil_dt)
-        .map_err(|e| FlussError::new_err(format!("Invalid datetime: {}", e)))?;
+        .map_err(|e| FlussError::new_err(format!("Invalid datetime: {e}")))?;
 
     let millis = timestamp.as_millisecond();
     let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as 
i32) as i32;
diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index dcf7db8..032691e 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -58,7 +58,7 @@ pub async fn main() -> Result<()> {
 
     println!("\n=== Upserting ===");
     for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3, 
"Esquie", 35)] {
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(3);
         row.set_field(0, id);
         row.set_field(1, name);
         row.set_field(2, age);
@@ -80,7 +80,7 @@ pub async fn main() -> Result<()> {
     }
 
     println!("\n=== Updating ===");
-    let mut row = GenericRow::new();
+    let mut row = GenericRow::new(3);
     row.set_field(0, 1);
     row.set_field(1, "Verso");
     row.set_field(2, 33i64);
@@ -96,12 +96,11 @@ pub async fn main() -> Result<()> {
     );
 
     println!("\n=== Deleting ===");
-    let mut row = GenericRow::new();
+    // For delete, only primary key field needs to be set; other fields can 
remain null
+    let mut row = GenericRow::new(3);
     row.set_field(0, 2);
-    row.set_field(1, "");
-    row.set_field(2, 0i64);
     upsert_writer.delete(&row).await?;
-    println!("Deleted: {row:?}");
+    println!("Deleted row with id=2");
 
     let result = lookuper.lookup(&make_key(2)).await?;
     if result.get_single_row()?.is_none() {
@@ -112,7 +111,7 @@ pub async fn main() -> Result<()> {
 }
 
 fn make_key(id: i32) -> GenericRow<'static> {
-    let mut row = GenericRow::new();
+    let mut row = GenericRow::new(1);
     row.set_field(0, id);
     row
 }
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index 7333056..ca6b942 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -56,7 +56,7 @@ pub async fn main() -> Result<()> {
     print!("Get created table:\n {table_info}\n");
 
     // write row
-    let mut row = GenericRow::new();
+    let mut row = GenericRow::new(3);
     row.set_field(0, 22222);
     row.set_field(1, "t2t");
     row.set_field(2, 123_456_789_123i64);
@@ -64,7 +64,7 @@ pub async fn main() -> Result<()> {
     let table = conn.get_table(&table_path).await?;
     let append_writer = table.new_append()?.create_writer();
     let f1 = append_writer.append(row);
-    row = GenericRow::new();
+    row = GenericRow::new(3);
     row.set_field(0, 233333);
     row.set_field(1, "tt44");
     row.set_field(2, 987_654_321_987i64);
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index b529806..7ece34b 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -800,7 +800,7 @@ impl PendingFetch for RemotePendingFetch {
             let pos = self.pos_in_log_segment as usize;
             if pos >= file_size {
                 return Err(Error::UnexpectedError {
-                    message: format!("Position {} exceeds file size {}", pos, 
file_size),
+                    message: format!("Position {pos} exceeds file size 
{file_size}"),
                     source: None,
                 });
             }
@@ -911,7 +911,7 @@ mod tests {
             },
         )?;
 
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(2);
         row.set_field(0, 1_i32);
         row.set_field(1, "alice");
         let record = WriteRecord::for_append(table_path, 1, row);
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index c39056d..a2e19d4 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -469,7 +469,7 @@ async fn spawn_download_task(
                 result_sender: request.result_sender,
             }
         }
-        Err(e) if request.result_sender.is_closed() => {
+        Err(_e) if request.result_sender.is_closed() => {
             // Receiver dropped (cancelled) - release permit, don't re-queue
             drop(permit);
             DownloadResult::Cancelled
@@ -491,8 +491,7 @@ async fn spawn_download_task(
                 DownloadResult::FailedPermanently {
                     error: Error::UnexpectedError {
                         message: format!(
-                            "Failed to download remote log segment after {} 
retries: {}",
-                            retry_count, e
+                            "Failed to download remote log segment after 
{retry_count} retries: {e}"
                         ),
                         source: Some(Box::new(e)),
                     },
@@ -585,7 +584,7 @@ async fn coordinator_loop(
                         // Cancelled - permit already released, nothing to do
                     }
                     Err(e) => {
-                        log::error!("Download task panicked: {:?}", e);
+                        log::error!("Download task panicked: {e:?}");
                         // Permit already released via RAII
                     }
                 }
@@ -1001,7 +1000,7 @@ mod tests {
 
                 if should_fail {
                     Err(Error::UnexpectedError {
-                        message: format!("Fake fetch failed for {}", 
segment_id),
+                        message: format!("Fake fetch failed for {segment_id}"),
                         source: None,
                     })
                 } else {
@@ -1012,7 +1011,7 @@ mod tests {
                         .unwrap()
                         .as_nanos();
                     let file_path =
-                        temp_dir.join(format!("fake_segment_{}_{}.log", 
segment_id, timestamp));
+                        
temp_dir.join(format!("fake_segment_{segment_id}_{timestamp}.log"));
                     tokio::fs::write(&file_path, &fake_data).await?;
 
                     Ok(FetchResult {
@@ -1121,7 +1120,7 @@ mod tests {
 
         // Request 4 segments with same priority (to isolate concurrency 
limiting from priority)
         let segs: Vec<_> = (0..4)
-            .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, 
bucket.clone()))
+            .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, 
bucket.clone()))
             .collect();
 
         let _futures: Vec<_> = segs
@@ -1168,7 +1167,7 @@ mod tests {
 
         // Request 4 downloads
         let segs: Vec<_> = (0..4)
-            .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, 
bucket.clone()))
+            .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, 
bucket.clone()))
             .collect();
 
         let mut futures: Vec<_> = segs
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index a3909e7..984592d 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -232,8 +232,7 @@ impl UpsertWriterFactory {
                 None => {
                     return Err(IllegalArgument {
                         message: format!(
-                            "The specified primary key {} is not in row type 
{}",
-                            primary_key, row_type
+                            "The specified primary key {primary_key} is not in 
row type {row_type}"
                         ),
                     });
                 }
@@ -250,8 +249,7 @@ impl UpsertWriterFactory {
                 if target_column_set[index] {
                     return Err(IllegalArgument {
                         message: format!(
-                            "Explicitly specifying values for the auto 
increment column {} is not allowed.",
-                            auto_increment_col_name
+                            "Explicitly specifying values for the auto 
increment column {auto_increment_col_name} is not allowed."
                         ),
                     });
                 }
diff --git a/crates/fluss/src/metadata/json_serde.rs 
b/crates/fluss/src/metadata/json_serde.rs
index faa5583..d0d56ef 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -205,7 +205,7 @@ impl JsonSerde for DataType {
                 DataType::Decimal(
                     
crate::metadata::datatype::DecimalType::with_nullable(true, precision, scale)
                         .map_err(|e| Error::JsonSerdeError {
-                        message: format!("Invalid DECIMAL parameters: {}", e),
+                        message: format!("Invalid DECIMAL parameters: {e}"),
                     })?,
                 )
             }
@@ -218,7 +218,7 @@ impl JsonSerde for DataType {
                 DataType::Time(
                     crate::metadata::datatype::TimeType::with_nullable(true, 
precision).map_err(
                         |e| Error::JsonSerdeError {
-                            message: format!("Invalid TIME_WITHOUT_TIME_ZONE 
precision: {}", e),
+                            message: format!("Invalid TIME_WITHOUT_TIME_ZONE 
precision: {e}"),
                         },
                     )?,
                 )
@@ -231,10 +231,7 @@ impl JsonSerde for DataType {
                 DataType::Timestamp(
                     
crate::metadata::datatype::TimestampType::with_nullable(true, precision)
                         .map_err(|e| Error::JsonSerdeError {
-                            message: format!(
-                                "Invalid TIMESTAMP_WITHOUT_TIME_ZONE 
precision: {}",
-                                e
-                            ),
+                            message: format!("Invalid 
TIMESTAMP_WITHOUT_TIME_ZONE precision: {e}"),
                         })?,
                 )
             }
@@ -247,8 +244,7 @@ impl JsonSerde for DataType {
                     
crate::metadata::datatype::TimestampLTzType::with_nullable(true, precision)
                         .map_err(|e| Error::JsonSerdeError {
                             message: format!(
-                                "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE 
precision: {}",
-                                e
+                                "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE 
precision: {e}"
                             ),
                         })?,
                 )
diff --git a/crates/fluss/src/metadata/partition.rs 
b/crates/fluss/src/metadata/partition.rs
index 1ecc0dc..e40fbf9 100644
--- a/crates/fluss/src/metadata/partition.rs
+++ b/crates/fluss/src/metadata/partition.rs
@@ -131,8 +131,7 @@ impl ResolvedPartitionSpec {
             if parts.len() != 2 {
                 return Err(Error::IllegalArgument {
                     message: format!(
-                        "Invalid partition name format. Expected key=value, 
got: {}",
-                        pair
+                        "Invalid partition name format. Expected key=value, 
got: {pair}"
                     ),
                 });
             }
@@ -199,8 +198,7 @@ impl ResolvedPartitionSpec {
                 None => {
                     return Err(Error::IllegalArgument {
                         message: format!(
-                            "table does not contain partitionKey: {}",
-                            other_partition_key
+                            "table does not contain partitionKey: 
{other_partition_key}"
                         ),
                     });
                 }
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index c4a9195..3b9da7d 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -227,8 +227,7 @@ impl SchemaBuilder {
             if !column_names.contains(auto_inc_col) {
                 return Err(IllegalArgument {
                     message: format!(
-                        "Auto increment column '{}' is not found in the schema 
columns.",
-                        auto_inc_col
+                        "Auto increment column '{auto_inc_col}' is not found 
in the schema columns."
                     ),
                 });
             }
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 4bfdc71..63df6de 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -107,7 +107,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> 
Result<usize> {
     // Check for negative size (corrupted data)
     if batch_size_bytes < 0 {
         return Err(Error::UnexpectedError {
-            message: format!("Invalid negative batch size: {}", 
batch_size_bytes),
+            message: format!("Invalid negative batch size: 
{batch_size_bytes}"),
             source: None,
         });
     }
@@ -120,8 +120,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> 
Result<usize> {
             .checked_add(LOG_OVERHEAD)
             .ok_or_else(|| Error::UnexpectedError {
                 message: format!(
-                    "Batch size {} + LOG_OVERHEAD {} would overflow",
-                    batch_size_u, LOG_OVERHEAD
+                    "Batch size {batch_size_u} + LOG_OVERHEAD {LOG_OVERHEAD} 
would overflow"
                 ),
                 source: None,
             })?;
@@ -130,8 +129,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> 
Result<usize> {
     if total_size > MAX_BATCH_SIZE {
         return Err(Error::UnexpectedError {
             message: format!(
-                "Batch size {} exceeds maximum allowed size {}",
-                total_size, MAX_BATCH_SIZE
+                "Batch size {total_size} exceeds maximum allowed size 
{MAX_BATCH_SIZE}"
             ),
             source: None,
         });
@@ -259,8 +257,7 @@ impl RowAppendRecordBatchBuilder {
                     .with_precision_and_scale(*precision, *scale)
                     .map_err(|e| Error::IllegalArgument {
                         message: format!(
-                            "Invalid decimal precision {} or scale {}: {}",
-                            precision, scale, e
+                            "Invalid decimal precision {precision} or scale 
{scale}: {e}"
                         ),
                     })?;
                 Ok(Box::new(builder))
@@ -273,8 +270,7 @@ impl RowAppendRecordBatchBuilder {
                 }
                 _ => Err(Error::IllegalArgument {
                     message: format!(
-                        "Time32 only supports Second and Millisecond units, 
got: {:?}",
-                        unit
+                        "Time32 only supports Second and Millisecond units, 
got: {unit:?}"
                     ),
                 }),
             },
@@ -285,8 +281,7 @@ impl RowAppendRecordBatchBuilder {
                 arrow_schema::TimeUnit::Nanosecond => 
Ok(Box::new(Time64NanosecondBuilder::new())),
                 _ => Err(Error::IllegalArgument {
                     message: format!(
-                        "Time64 only supports Microsecond and Nanosecond 
units, got: {:?}",
-                        unit
+                        "Time64 only supports Microsecond and Nanosecond 
units, got: {unit:?}"
                     ),
                 }),
             },
@@ -592,10 +587,7 @@ impl FileSource {
         // Validate base_offset to prevent underflow in total_size()
         if base_offset > file_size {
             return Err(Error::UnexpectedError {
-                message: format!(
-                    "base_offset ({}) exceeds file_size ({})",
-                    base_offset, file_size
-                ),
+                message: format!("base_offset ({base_offset}) exceeds 
file_size ({file_size})"),
                 source: None,
             });
         }
@@ -1044,7 +1036,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
Result<ArrowDataType> {
             7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond),
             invalid => {
                 return Err(Error::IllegalArgument {
-                    message: format!("Invalid precision {} for TimeType (must 
be 0-9)", invalid),
+                    message: format!("Invalid precision {invalid} for TimeType 
(must be 0-9)"),
                 });
             }
         },
@@ -1055,10 +1047,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
Result<ArrowDataType> {
             7..=9 => 
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
             invalid => {
                 return Err(Error::IllegalArgument {
-                    message: format!(
-                        "Invalid precision {} for TimestampType (must be 0-9)",
-                        invalid
-                    ),
+                    message: format!("Invalid precision {invalid} for 
TimestampType (must be 0-9)"),
                 });
             }
         },
@@ -1070,8 +1059,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
Result<ArrowDataType> {
             invalid => {
                 return Err(Error::IllegalArgument {
                     message: format!(
-                        "Invalid precision {} for TimestampLTzType (must be 
0-9)",
-                        invalid
+                        "Invalid precision {invalid} for TimestampLTzType 
(must be 0-9)"
                     ),
                 });
             }
@@ -1939,13 +1927,13 @@ mod tests {
             },
         )?;
 
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(2);
         row.set_field(0, 1_i32);
         row.set_field(1, "alice");
         let record = WriteRecord::for_append(table_path.clone(), 1, row);
         builder.append(&record)?;
 
-        let mut row2 = GenericRow::new();
+        let mut row2 = GenericRow::new(2);
         row2.set_field(0, 2_i32);
         row2.set_field(1, "bob");
         let record2 = WriteRecord::for_append(table_path, 2, row2);
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 615e038..46c25b2 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -199,10 +199,7 @@ impl InternalRow for ColumnarRow {
         let field = schema.field(pos);
         let arrow_scale = match field.data_type() {
             DataType::Decimal128(_p, s) => *s as i64,
-            dt => panic!(
-                "Expected Decimal128 data type at column {}, found: {:?}",
-                pos, dt
-            ),
+            dt => panic!("Expected Decimal128 data type at column {pos}, 
found: {dt:?}"),
         };
 
         let i128_val = array.value(self.row_id);
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 40470db..00e53aa 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -50,7 +50,7 @@ impl<'a> CompactedRowDeserializer<'a> {
     }
 
     pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> 
GenericRow<'a> {
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(self.row_type.fields().len());
         let mut cursor = reader.initial_position();
         for (col_pos, data_field) in self.row_type.fields().iter().enumerate() 
{
             let dtype = &data_field.data_type;
@@ -161,10 +161,7 @@ impl<'a> CompactedRowDeserializer<'a> {
                     }
                 }
                 _ => {
-                    panic!(
-                        "Unsupported DataType in CompactedRowDeserializer: 
{:?}",
-                        dtype
-                    );
+                    panic!("Unsupported DataType in CompactedRowDeserializer: 
{dtype:?}");
                 }
             };
             cursor = next_cursor;
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 7b3850f..b808373 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -407,8 +407,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) -> 
Result<i64> {
         .checked_mul(MICROS_PER_MILLI)
         .ok_or_else(|| RowConvertError {
             message: format!(
-                "Timestamp milliseconds {} overflows when converting to 
microseconds",
-                millis
+                "Timestamp milliseconds {millis} overflows when converting to 
microseconds"
             ),
         })?;
     let nanos_micros = (nanos as i64) / MICROS_PER_MILLI;
@@ -416,8 +415,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) -> 
Result<i64> {
         .checked_add(nanos_micros)
         .ok_or_else(|| RowConvertError {
             message: format!(
-                "Timestamp overflow when adding microseconds: {} + {}",
-                millis_micros, nanos_micros
+                "Timestamp overflow when adding microseconds: {millis_micros} 
+ {nanos_micros}"
             ),
         })
 }
@@ -429,16 +427,14 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> 
Result<i64> {
         .checked_mul(NANOS_PER_MILLI)
         .ok_or_else(|| RowConvertError {
             message: format!(
-                "Timestamp milliseconds {} overflows when converting to 
nanoseconds",
-                millis
+                "Timestamp milliseconds {millis} overflows when converting to 
nanoseconds"
             ),
         })?;
     millis_nanos
         .checked_add(nanos as i64)
         .ok_or_else(|| RowConvertError {
             message: format!(
-                "Timestamp overflow when adding nanoseconds: {} + {}",
-                millis_nanos, nanos
+                "Timestamp overflow when adding nanoseconds: {millis_nanos} + 
{nanos}"
             ),
         })
 }
@@ -504,10 +500,7 @@ impl Datum<'_> {
                     arrow_schema::DataType::Decimal128(p, s) => (*p, *s),
                     _ => {
                         return Err(RowConvertError {
-                            message: format!(
-                                "Expected Decimal128 Arrow type, got: {:?}",
-                                data_type
-                            ),
+                            message: format!("Expected Decimal128 Arrow type, 
got: {data_type:?}"),
                         });
                     }
                 };
@@ -515,7 +508,7 @@ impl Datum<'_> {
                 // Validate scale is non-negative (Fluss doesn't support 
negative scales)
                 if s < 0 {
                     return Err(RowConvertError {
-                        message: format!("Negative decimal scale {} is not 
supported", s),
+                        message: format!("Negative decimal scale {s} is not 
supported"),
                     });
                 }
 
@@ -535,8 +528,7 @@ impl Datum<'_> {
                     if actual_precision > target_precision as usize {
                         return Err(RowConvertError {
                             message: format!(
-                                "Decimal precision overflow: value has {} 
digits but Arrow expects {} (value: {})",
-                                actual_precision, target_precision, rescaled
+                                "Decimal precision overflow: value has 
{actual_precision} digits but Arrow expects {target_precision} (value: 
{rescaled})"
                             ),
                         });
                     }
@@ -546,7 +538,7 @@ impl Datum<'_> {
                         Ok(v) => v,
                         Err(_) => {
                             return Err(RowConvertError {
-                                message: format!("Decimal value exceeds i128 
range: {}", rescaled),
+                                message: format!("Decimal value exceeds i128 
range: {rescaled}"),
                             });
                         }
                     };
@@ -575,8 +567,7 @@ impl Datum<'_> {
                             if millis % MILLIS_PER_SECOND as i32 != 0 {
                                 return Err(RowConvertError {
                                     message: format!(
-                                        "Time value {} ms has sub-second 
precision but schema expects seconds only",
-                                        millis
+                                        "Time value {millis} ms has sub-second 
precision but schema expects seconds only"
                                     ),
                                 });
                             }
@@ -602,8 +593,7 @@ impl Datum<'_> {
                                 .checked_mul(MICROS_PER_MILLI)
                                 .ok_or_else(|| RowConvertError {
                                     message: format!(
-                                        "Time value {} ms overflows when 
converting to microseconds",
-                                        millis
+                                        "Time value {millis} ms overflows when 
converting to microseconds"
                                     ),
                                 })?;
                             b.append_value(micros);
@@ -618,8 +608,7 @@ impl Datum<'_> {
                             let nanos = (millis as 
i64).checked_mul(NANOS_PER_MILLI).ok_or_else(
                                 || RowConvertError {
                                     message: format!(
-                                        "Time value {} ms overflows when 
converting to nanoseconds",
-                                        millis
+                                        "Time value {millis} ms overflows when 
converting to nanoseconds"
                                     ),
                                 },
                             )?;
@@ -630,8 +619,7 @@ impl Datum<'_> {
                     _ => {
                         return Err(RowConvertError {
                             message: format!(
-                                "Expected Time32/Time64 Arrow type, got: {:?}",
-                                data_type
+                                "Expected Time32/Time64 Arrow type, got: 
{data_type:?}"
                             ),
                         });
                     }
@@ -808,8 +796,7 @@ impl TimestampNtz {
         if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
             return Err(crate::error::Error::IllegalArgument {
                 message: format!(
-                    "nanoOfMillisecond must be in range [0, {}], got: {}",
-                    MAX_NANO_OF_MILLISECOND, nano_of_millisecond
+                    "nanoOfMillisecond must be in range [0, 
{MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}"
                 ),
             });
         }
@@ -856,8 +843,7 @@ impl TimestampLtz {
         if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
             return Err(crate::error::Error::IllegalArgument {
                 message: format!(
-                    "nanoOfMillisecond must be in range [0, {}], got: {}",
-                    MAX_NANO_OF_MILLISECOND, nano_of_millisecond
+                    "nanoOfMillisecond must be in range [0, 
{MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}"
                 ),
             });
         }
@@ -1030,10 +1016,8 @@ mod timestamp_tests {
     #[test]
     fn test_timestamp_nanos_out_of_range() {
         // Test that both TimestampNtz and TimestampLtz reject invalid nanos
-        let expected_msg = format!(
-            "nanoOfMillisecond must be in range [0, {}]",
-            MAX_NANO_OF_MILLISECOND
-        );
+        let expected_msg =
+            format!("nanoOfMillisecond must be in range [0, 
{MAX_NANO_OF_MILLISECOND}]");
 
         // Too large (1,000,000 is just beyond the valid range)
         let result_ntz = TimestampNtz::from_millis_nanos(1000, 
MAX_NANO_OF_MILLISECOND + 1);
diff --git a/crates/fluss/src/row/decimal.rs b/crates/fluss/src/row/decimal.rs
index b14bde5..fd21b82 100644
--- a/crates/fluss/src/row/decimal.rs
+++ b/crates/fluss/src/row/decimal.rs
@@ -129,16 +129,14 @@ impl Decimal {
         // Sanity check that scale matches
         debug_assert_eq!(
             exp, scale as i64,
-            "Scaled decimal exponent ({}) != expected scale ({})",
-            exp, scale
+            "Scaled decimal exponent ({exp}) != expected scale ({scale})"
         );
 
         let actual_precision = Self::compute_precision(&unscaled);
         if actual_precision > precision as usize {
             return Err(Error::IllegalArgument {
                 message: format!(
-                    "Decimal precision overflow: value has {} digits but 
precision is {} (value: {})",
-                    actual_precision, precision, scaled
+                    "Decimal precision overflow: value has {actual_precision} 
digits but precision is {precision} (value: {scaled})"
                 ),
             });
         }
@@ -147,8 +145,7 @@ impl Decimal {
         let long_val = if precision <= MAX_COMPACT_PRECISION {
             Some(i64::try_from(&unscaled).map_err(|_| Error::IllegalArgument {
                 message: format!(
-                    "Decimal mantissa exceeds i64 range for compact precision 
{}: unscaled={} (value={})",
-                    precision, unscaled, scaled
+                    "Decimal mantissa exceeds i64 range for compact precision 
{precision}: unscaled={unscaled} (value={scaled})"
                 ),
             })?)
         } else {
@@ -168,8 +165,7 @@ impl Decimal {
         if precision > MAX_COMPACT_PRECISION {
             return Err(Error::IllegalArgument {
                 message: format!(
-                    "Precision {} exceeds MAX_COMPACT_PRECISION ({})",
-                    precision, MAX_COMPACT_PRECISION
+                    "Precision {precision} exceeds MAX_COMPACT_PRECISION 
({MAX_COMPACT_PRECISION})"
                 ),
             });
         }
@@ -178,8 +174,7 @@ impl Decimal {
         if actual_precision > precision as usize {
             return Err(Error::IllegalArgument {
                 message: format!(
-                    "Decimal precision overflow: unscaled value has {} digits 
but precision is {}",
-                    actual_precision, precision
+                    "Decimal precision overflow: unscaled value has 
{actual_precision} digits but precision is {precision}"
                 ),
             });
         }
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 81a4254..f7c8bec 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -159,6 +159,23 @@ impl<'a> InternalRow for GenericRow<'a> {
         self.values.get(_pos).unwrap().try_into().unwrap()
     }
 
+    fn get_float(&self, pos: usize) -> f32 {
+        self.values.get(pos).unwrap().try_into().unwrap()
+    }
+
+    fn get_double(&self, pos: usize) -> f64 {
+        self.values.get(pos).unwrap().try_into().unwrap()
+    }
+
+    fn get_char(&self, pos: usize, _length: usize) -> &str {
+        // don't check length, following java client
+        self.get_string(pos)
+    }
+
+    fn get_string(&self, pos: usize) -> &str {
+        self.values.get(pos).unwrap().try_into().unwrap()
+    }
+
     fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) -> 
Decimal {
         match self.values.get(pos).unwrap() {
             Datum::Decimal(d) => d.clone(),
@@ -196,23 +213,6 @@ impl<'a> InternalRow for GenericRow<'a> {
         }
     }
 
-    fn get_float(&self, pos: usize) -> f32 {
-        self.values.get(pos).unwrap().try_into().unwrap()
-    }
-
-    fn get_double(&self, pos: usize) -> f64 {
-        self.values.get(pos).unwrap().try_into().unwrap()
-    }
-
-    fn get_char(&self, pos: usize, _length: usize) -> &str {
-        // don't check length, following java client
-        self.get_string(pos)
-    }
-
-    fn get_string(&self, pos: usize) -> &str {
-        self.values.get(pos).unwrap().try_into().unwrap()
-    }
-
     fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
         self.values.get(pos).unwrap().as_blob()
     }
@@ -222,24 +222,39 @@ impl<'a> InternalRow for GenericRow<'a> {
     }
 }
 
-impl<'a> Default for GenericRow<'a> {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
 impl<'a> GenericRow<'a> {
     pub fn from_data(data: Vec<impl Into<Datum<'a>>>) -> GenericRow<'a> {
         GenericRow {
             values: data.into_iter().map(Into::into).collect(),
         }
     }
-    pub fn new() -> GenericRow<'a> {
-        GenericRow { values: vec![] }
+
+    /// Creates a GenericRow with the specified number of fields, all 
initialized to null.
+    ///
+    /// This is useful when you need to create a row with a specific field 
count
+    /// but only want to set some fields (e.g., for KV delete operations where
+    /// only primary key fields need to be set).
+    ///
+    /// # Example
+    /// ```
+    /// use fluss::row::GenericRow;
+    ///
+    /// let mut row = GenericRow::new(3);
+    /// row.set_field(0, 42); // Only set the primary key
+    /// // Fields 1 and 2 remain null
+    /// ```
+    pub fn new(field_count: usize) -> GenericRow<'a> {
+        GenericRow {
+            values: vec![Datum::Null; field_count],
+        }
     }
 
+    /// Sets the field at the given position to the specified value.
+    ///
+    /// # Panics
+    /// Panics if `pos` is out of bounds (>= field count).
     pub fn set_field(&mut self, pos: usize, value: impl Into<Datum<'a>>) {
-        self.values.insert(pos, value.into());
+        self.values[pos] = value.into();
     }
 }
 
@@ -249,11 +264,32 @@ mod tests {
 
     #[test]
     fn is_null_at_checks_datum_nullity() {
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(2);
         row.set_field(0, Datum::Null);
         row.set_field(1, 42_i32);
 
         assert!(row.is_null_at(0));
         assert!(!row.is_null_at(1));
     }
+
+    #[test]
+    fn new_initializes_nulls() {
+        let row = GenericRow::new(3);
+        assert_eq!(row.get_field_count(), 3);
+        assert!(row.is_null_at(0));
+        assert!(row.is_null_at(1));
+        assert!(row.is_null_at(2));
+    }
+
+    #[test]
+    fn partial_row_for_delete() {
+        // Simulates delete scenario: only primary key (field 0) is set
+        let mut row = GenericRow::new(3);
+        row.set_field(0, 123_i32);
+        // Fields 1 and 2 remain null
+        assert_eq!(row.get_field_count(), 3);
+        assert_eq!(row.get_int(0), 123);
+        assert!(row.is_null_at(1));
+        assert!(row.is_null_at(2));
+    }
 }
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index 3f46f9f..a4f2961 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -53,10 +53,8 @@ mod kv_table_test {
     }
 
     fn make_key(id: i32) -> GenericRow<'static> {
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(3);
         row.set_field(0, id);
-        row.set_field(1, "");
-        row.set_field(2, 0i64);
         row
     }
 
@@ -98,7 +96,7 @@ mod kv_table_test {
 
         // Upsert rows
         for (id, name, age) in &test_data {
-            let mut row = GenericRow::new();
+            let mut row = GenericRow::new(3);
             row.set_field(0, *id);
             row.set_field(1, *name);
             row.set_field(2, *age);
@@ -132,7 +130,7 @@ mod kv_table_test {
         }
 
         // Update the record with new age
-        let mut updated_row = GenericRow::new();
+        let mut updated_row = GenericRow::new(3);
         updated_row.set_field(0, 1);
         updated_row.set_field(1, "Verso");
         updated_row.set_field(2, 33i64);
@@ -162,10 +160,8 @@ mod kv_table_test {
         );
 
         // Delete record with id=1
-        let mut delete_row = GenericRow::new();
+        let mut delete_row = GenericRow::new(3);
         delete_row.set_field(0, 1);
-        delete_row.set_field(1, "");
-        delete_row.set_field(2, 0i64);
         upsert_writer
             .delete(&delete_row)
             .await
@@ -262,7 +258,7 @@ mod kv_table_test {
         ];
 
         for (region, user_id, score) in &test_data {
-            let mut row = GenericRow::new();
+            let mut row = GenericRow::new(3);
             row.set_field(0, *region);
             row.set_field(1, *user_id);
             row.set_field(2, *score);
@@ -277,7 +273,7 @@ mod kv_table_test {
             .expect("Failed to create lookuper");
 
         // Lookup (US, 1) - should return score 100
-        let mut key = GenericRow::new();
+        let mut key = GenericRow::new(3);
         key.set_field(0, "US");
         key.set_field(1, 1);
         let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -288,7 +284,7 @@ mod kv_table_test {
         assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100");
 
         // Lookup (EU, 2) - should return score 250
-        let mut key = GenericRow::new();
+        let mut key = GenericRow::new(3);
         key.set_field(0, "EU");
         key.set_field(1, 2);
         let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -299,7 +295,7 @@ mod kv_table_test {
         assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
 
         // Update (US, 1) score
-        let mut update_row = GenericRow::new();
+        let mut update_row = GenericRow::new(3);
         update_row.set_field(0, "US");
         update_row.set_field(1, 1);
         update_row.set_field(2, 500i64);
@@ -309,7 +305,7 @@ mod kv_table_test {
             .expect("Failed to update");
 
         // Verify update
-        let mut key = GenericRow::new();
+        let mut key = GenericRow::new(3);
         key.set_field(0, "US");
         key.set_field(1, 1);
         let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -367,7 +363,7 @@ mod kv_table_test {
             .create_writer()
             .expect("Failed to create writer");
 
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(4);
         row.set_field(0, 1);
         row.set_field(1, "Verso");
         row.set_field(2, 32i64);
@@ -407,7 +403,7 @@ mod kv_table_test {
             .expect("Failed to create UpsertWriter with partial write");
 
         // Update only the score column
-        let mut partial_row = GenericRow::new();
+        let mut partial_row = GenericRow::new(4);
         partial_row.set_field(0, 1);
         partial_row.set_field(1, Datum::Null); // not in partial update column
         partial_row.set_field(2, Datum::Null); // not in partial update column
@@ -522,7 +518,7 @@ mod kv_table_test {
         let col_binary: &[u8] = b"fixed binary data!!!";
 
         // Upsert a row with all datatypes
-        let mut row = GenericRow::new();
+        let mut row = GenericRow::new(17);
         row.set_field(0, pk_int);
         row.set_field(1, col_boolean);
         row.set_field(2, col_tinyint);
@@ -553,7 +549,7 @@ mod kv_table_test {
             .create_lookuper()
             .expect("Failed to create lookuper");
 
-        let mut key = GenericRow::new();
+        let mut key = GenericRow::new(17);
         key.set_field(0, pk_int);
 
         let result = lookuper.lookup(&key).await.expect("Failed to lookup");
@@ -625,7 +621,7 @@ mod kv_table_test {
 
         // Test with null values for nullable columns
         let pk_int_2 = 2i32;
-        let mut row_with_nulls = GenericRow::new();
+        let mut row_with_nulls = GenericRow::new(17);
         row_with_nulls.set_field(0, pk_int_2);
         row_with_nulls.set_field(1, Datum::Null); // col_boolean
         row_with_nulls.set_field(2, Datum::Null); // col_tinyint
@@ -650,7 +646,7 @@ mod kv_table_test {
             .expect("Failed to upsert row with nulls");
 
         // Lookup row with nulls
-        let mut key2 = GenericRow::new();
+        let mut key2 = GenericRow::new(17);
         key2.set_field(0, pk_int_2);
 
         let result = lookuper.lookup(&key2).await.expect("Failed to lookup");
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index e28a836..c83da0f 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -146,7 +146,7 @@ mod table_remote_scan_test {
         // append 20 rows, there must be some tiered to remote
         let record_count = 20;
         for i in 0..record_count {
-            let mut row = GenericRow::new();
+            let mut row = GenericRow::new(2);
             row.set_field(0, i as i32);
             let v = format!("v{}", i);
             row.set_field(1, v.as_str());


Reply via email to