This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 90cc35a  chore: Change upsert and delete to use read only self 
reference by using mutex to guard mutable encoders (#247)
90cc35a is described below

commit 90cc35a169c0eeb0bf20bb75a304e991bcb3533a
Author: Keith Lee <[email protected]>
AuthorDate: Thu Feb 5 22:29:32 2026 +0000

    chore: Change upsert and delete to use read only self reference by using 
mutex to guard mutable encoders (#247)
---
 crates/examples/src/example_kv_table.rs            |  2 +-
 .../examples/src/example_partitioned_kv_table.rs   |  2 +-
 crates/fluss/src/client/table/upsert.rs            | 58 +++++++++++++++-------
 crates/fluss/tests/integration/kv_table.rs         | 12 ++---
 4 files changed, 48 insertions(+), 26 deletions(-)

diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index 042e384..2bbcc74 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -54,7 +54,7 @@ pub async fn main() -> Result<()> {
 
     let table = conn.get_table(&table_path).await?;
     let table_upsert = table.new_upsert()?;
-    let mut upsert_writer = table_upsert.create_writer()?;
+    let upsert_writer = table_upsert.create_writer()?;
 
     println!("\n=== Upserting ===");
     for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3, 
"Esquie", 35)] {
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
index d1b6814..884869e 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -61,7 +61,7 @@ pub async fn main() -> Result<()> {
 
     let table = conn.get_table(&table_path).await?;
     let table_upsert = table.new_upsert()?;
-    let mut upsert_writer = table_upsert.create_writer()?;
+    let upsert_writer = table_upsert.create_writer()?;
 
     println!("\n=== Upserting ===");
     for (id, region, zone, score) in [
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index bb6c651..92f6a20 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -16,13 +16,13 @@
 // under the License.
 
 use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
-use crate::error::Error::IllegalArgument;
+use crate::error::Error::{IllegalArgument, UnexpectedError};
 use crate::error::Result;
 use crate::metadata::{RowType, TableInfo, TablePath};
 use crate::row::InternalRow;
 use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, 
RowEncoderFactory};
 use crate::row::field_getter::FieldGetter;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use crate::client::table::partition_getter::{PartitionGetter, 
get_physical_path};
 use bitvec::prelude::bitvec;
@@ -111,12 +111,12 @@ pub struct UpsertWriter {
     table_path: Arc<TablePath>,
     writer_client: Arc<WriterClient>,
     partition_field_getter: Option<PartitionGetter>,
-    primary_key_encoder: Box<dyn KeyEncoder>,
+    primary_key_encoder: Mutex<Box<dyn KeyEncoder>>,
     target_columns: Option<Arc<Vec<usize>>>,
     // Use primary key encoder as bucket key encoder when None
-    bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
+    bucket_key_encoder: Option<Mutex<Box<dyn KeyEncoder>>>,
     write_format: WriteFormat,
-    row_encoder: Box<dyn RowEncoder>,
+    row_encoder: Mutex<Box<dyn RowEncoder>>,
     field_getters: Box<[FieldGetter]>,
     table_info: Arc<TableInfo>,
 }
@@ -173,11 +173,14 @@ impl UpsertWriterFactory {
             table_path,
             partition_field_getter,
             writer_client,
-            primary_key_encoder,
+            primary_key_encoder: Mutex::new(primary_key_encoder),
             target_columns: partial_update_columns,
-            bucket_key_encoder,
+            bucket_key_encoder: bucket_key_encoder.map(Mutex::new),
             write_format,
-            row_encoder: Box::new(RowEncoderFactory::create(kv_format, 
row_type.clone())?),
+            row_encoder: Mutex::new(Box::new(RowEncoderFactory::create(
+                kv_format,
+                row_type.clone(),
+            )?)),
             field_getters,
             table_info: table_info.clone(),
         })
@@ -294,22 +297,41 @@ impl UpsertWriter {
         Ok(())
     }
 
-    fn get_keys(&mut self, row: &dyn InternalRow) -> Result<(Bytes, 
Option<Bytes>)> {
-        let key = self.primary_key_encoder.encode_key(row)?;
-        let bucket_key = match &mut self.bucket_key_encoder {
-            Some(bucket_key_encoder) => 
Some(bucket_key_encoder.encode_key(row)?),
+    fn get_keys(&self, row: &dyn InternalRow) -> Result<(Bytes, 
Option<Bytes>)> {
+        let key = self
+            .primary_key_encoder
+            .lock()
+            .map_err(|e| UnexpectedError {
+                message: format!("primary_key_encoder lock poisoned: {e}"),
+                source: None,
+            })?
+            .encode_key(row)?;
+        let bucket_key = match &self.bucket_key_encoder {
+            Some(encoder) => Some(
+                encoder
+                    .lock()
+                    .map_err(|e| UnexpectedError {
+                        message: format!("bucket_key_encoder lock poisoned: 
{e}"),
+                        source: None,
+                    })?
+                    .encode_key(row)?,
+            ),
             None => Some(key.clone()),
         };
         Ok((key, bucket_key))
     }
 
-    fn encode_row<R: InternalRow>(&mut self, row: &R) -> Result<Bytes> {
-        self.row_encoder.start_new_row()?;
+    fn encode_row<R: InternalRow>(&self, row: &R) -> Result<Bytes> {
+        let mut encoder = self.row_encoder.lock().map_err(|e| UnexpectedError {
+            message: format!("row_encoder lock poisoned: {e}"),
+            source: None,
+        })?;
+        encoder.start_new_row()?;
         for (pos, field_getter) in self.field_getters.iter().enumerate() {
             let datum = field_getter.get_field(row);
-            self.row_encoder.encode_field(pos, datum)?;
+            encoder.encode_field(pos, datum)?;
         }
-        self.row_encoder.finish_row()
+        encoder.finish_row()
     }
 
     /// Flush data written that have not yet been sent to the server, forcing 
the client to send the
@@ -328,7 +350,7 @@ impl UpsertWriter {
     ///
     /// # Returns
     /// Ok(UpsertResult) when completed normally
-    pub async fn upsert<R: InternalRow>(&mut self, row: &R) -> 
Result<UpsertResult> {
+    pub async fn upsert<R: InternalRow>(&self, row: &R) -> 
Result<UpsertResult> {
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
@@ -367,7 +389,7 @@ impl UpsertWriter {
     ///
     /// # Returns
     /// Ok(DeleteResult) when completed normally
-    pub async fn delete<R: InternalRow>(&mut self, row: &R) -> 
Result<DeleteResult> {
+    pub async fn delete<R: InternalRow>(&self, row: &R) -> 
Result<DeleteResult> {
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index c419ed9..87d90b0 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -89,7 +89,7 @@ mod kv_table_test {
             .expect("Failed to get table");
 
         let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
-        let mut upsert_writer = table_upsert
+        let upsert_writer = table_upsert
             .create_writer()
             .expect("Failed to create writer");
 
@@ -246,7 +246,7 @@ mod kv_table_test {
             .expect("Failed to get table");
 
         let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
-        let mut upsert_writer = table_upsert
+        let upsert_writer = table_upsert
             .create_writer()
             .expect("Failed to create writer");
 
@@ -360,7 +360,7 @@ mod kv_table_test {
 
         // Insert initial record with all columns
         let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
-        let mut upsert_writer = table_upsert
+        let upsert_writer = table_upsert
             .create_writer()
             .expect("Failed to create writer");
 
@@ -399,7 +399,7 @@ mod kv_table_test {
         let partial_upsert = table_upsert
             .partial_update_with_column_names(&["id", "score"])
             .expect("Failed to create TableUpsert with partial update");
-        let mut partial_writer = partial_upsert
+        let partial_writer = partial_upsert
             .create_writer()
             .expect("Failed to create UpsertWriter with partial write");
 
@@ -478,7 +478,7 @@ mod kv_table_test {
 
         let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
 
-        let mut upsert_writer = table_upsert
+        let upsert_writer = table_upsert
             .create_writer()
             .expect("Failed to create writer");
 
@@ -659,7 +659,7 @@ mod kv_table_test {
             .expect("Failed to get table");
 
         let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
-        let mut upsert_writer = table_upsert
+        let upsert_writer = table_upsert
             .create_writer()
             .expect("Failed to create writer");
 

Reply via email to