This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 90cc35a chore: Change upsert and delete to use read only self
reference by using mutex to guard mutable encoders (#247)
90cc35a is described below
commit 90cc35a169c0eeb0bf20bb75a304e991bcb3533a
Author: Keith Lee <[email protected]>
AuthorDate: Thu Feb 5 22:29:32 2026 +0000
chore: Change upsert and delete to use read only self reference by using
mutex to guard mutable encoders (#247)
---
crates/examples/src/example_kv_table.rs | 2 +-
.../examples/src/example_partitioned_kv_table.rs | 2 +-
crates/fluss/src/client/table/upsert.rs | 58 +++++++++++++++-------
crates/fluss/tests/integration/kv_table.rs | 12 ++---
4 files changed, 48 insertions(+), 26 deletions(-)
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 042e384..2bbcc74 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -54,7 +54,7 @@ pub async fn main() -> Result<()> {
let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
- let mut upsert_writer = table_upsert.create_writer()?;
+ let upsert_writer = table_upsert.create_writer()?;
println!("\n=== Upserting ===");
for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3,
"Esquie", 35)] {
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index d1b6814..884869e 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -61,7 +61,7 @@ pub async fn main() -> Result<()> {
let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
- let mut upsert_writer = table_upsert.create_writer()?;
+ let upsert_writer = table_upsert.create_writer()?;
println!("\n=== Upserting ===");
for (id, region, zone, score) in [
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index bb6c651..92f6a20 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -16,13 +16,13 @@
// under the License.
use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
-use crate::error::Error::IllegalArgument;
+use crate::error::Error::{IllegalArgument, UnexpectedError};
use crate::error::Result;
use crate::metadata::{RowType, TableInfo, TablePath};
use crate::row::InternalRow;
use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder,
RowEncoderFactory};
use crate::row::field_getter::FieldGetter;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use crate::client::table::partition_getter::{PartitionGetter,
get_physical_path};
use bitvec::prelude::bitvec;
@@ -111,12 +111,12 @@ pub struct UpsertWriter {
table_path: Arc<TablePath>,
writer_client: Arc<WriterClient>,
partition_field_getter: Option<PartitionGetter>,
- primary_key_encoder: Box<dyn KeyEncoder>,
+ primary_key_encoder: Mutex<Box<dyn KeyEncoder>>,
target_columns: Option<Arc<Vec<usize>>>,
// Use primary key encoder as bucket key encoder when None
- bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
+ bucket_key_encoder: Option<Mutex<Box<dyn KeyEncoder>>>,
write_format: WriteFormat,
- row_encoder: Box<dyn RowEncoder>,
+ row_encoder: Mutex<Box<dyn RowEncoder>>,
field_getters: Box<[FieldGetter]>,
table_info: Arc<TableInfo>,
}
@@ -173,11 +173,14 @@ impl UpsertWriterFactory {
table_path,
partition_field_getter,
writer_client,
- primary_key_encoder,
+ primary_key_encoder: Mutex::new(primary_key_encoder),
target_columns: partial_update_columns,
- bucket_key_encoder,
+ bucket_key_encoder: bucket_key_encoder.map(Mutex::new),
write_format,
- row_encoder: Box::new(RowEncoderFactory::create(kv_format,
row_type.clone())?),
+ row_encoder: Mutex::new(Box::new(RowEncoderFactory::create(
+ kv_format,
+ row_type.clone(),
+ )?)),
field_getters,
table_info: table_info.clone(),
})
@@ -294,22 +297,41 @@ impl UpsertWriter {
Ok(())
}
- fn get_keys(&mut self, row: &dyn InternalRow) -> Result<(Bytes,
Option<Bytes>)> {
- let key = self.primary_key_encoder.encode_key(row)?;
- let bucket_key = match &mut self.bucket_key_encoder {
- Some(bucket_key_encoder) =>
Some(bucket_key_encoder.encode_key(row)?),
+ fn get_keys(&self, row: &dyn InternalRow) -> Result<(Bytes,
Option<Bytes>)> {
+ let key = self
+ .primary_key_encoder
+ .lock()
+ .map_err(|e| UnexpectedError {
+ message: format!("primary_key_encoder lock poisoned: {e}"),
+ source: None,
+ })?
+ .encode_key(row)?;
+ let bucket_key = match &self.bucket_key_encoder {
+ Some(encoder) => Some(
+ encoder
+ .lock()
+ .map_err(|e| UnexpectedError {
+ message: format!("bucket_key_encoder lock poisoned:
{e}"),
+ source: None,
+ })?
+ .encode_key(row)?,
+ ),
None => Some(key.clone()),
};
Ok((key, bucket_key))
}
- fn encode_row<R: InternalRow>(&mut self, row: &R) -> Result<Bytes> {
- self.row_encoder.start_new_row()?;
+ fn encode_row<R: InternalRow>(&self, row: &R) -> Result<Bytes> {
+ let mut encoder = self.row_encoder.lock().map_err(|e| UnexpectedError {
+ message: format!("row_encoder lock poisoned: {e}"),
+ source: None,
+ })?;
+ encoder.start_new_row()?;
for (pos, field_getter) in self.field_getters.iter().enumerate() {
let datum = field_getter.get_field(row);
- self.row_encoder.encode_field(pos, datum)?;
+ encoder.encode_field(pos, datum)?;
}
- self.row_encoder.finish_row()
+ encoder.finish_row()
}
/// Flush data written that have not yet been sent to the server, forcing
the client to send the
@@ -328,7 +350,7 @@ impl UpsertWriter {
///
/// # Returns
/// Ok(UpsertResult) when completed normally
- pub async fn upsert<R: InternalRow>(&mut self, row: &R) ->
Result<UpsertResult> {
+ pub async fn upsert<R: InternalRow>(&self, row: &R) ->
Result<UpsertResult> {
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
@@ -367,7 +389,7 @@ impl UpsertWriter {
///
/// # Returns
/// Ok(DeleteResult) when completed normally
- pub async fn delete<R: InternalRow>(&mut self, row: &R) ->
Result<DeleteResult> {
+ pub async fn delete<R: InternalRow>(&self, row: &R) ->
Result<DeleteResult> {
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index c419ed9..87d90b0 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -89,7 +89,7 @@ mod kv_table_test {
.expect("Failed to get table");
let table_upsert = table.new_upsert().expect("Failed to create
upsert");
- let mut upsert_writer = table_upsert
+ let upsert_writer = table_upsert
.create_writer()
.expect("Failed to create writer");
@@ -246,7 +246,7 @@ mod kv_table_test {
.expect("Failed to get table");
let table_upsert = table.new_upsert().expect("Failed to create
upsert");
- let mut upsert_writer = table_upsert
+ let upsert_writer = table_upsert
.create_writer()
.expect("Failed to create writer");
@@ -360,7 +360,7 @@ mod kv_table_test {
// Insert initial record with all columns
let table_upsert = table.new_upsert().expect("Failed to create
upsert");
- let mut upsert_writer = table_upsert
+ let upsert_writer = table_upsert
.create_writer()
.expect("Failed to create writer");
@@ -399,7 +399,7 @@ mod kv_table_test {
let partial_upsert = table_upsert
.partial_update_with_column_names(&["id", "score"])
.expect("Failed to create TableUpsert with partial update");
- let mut partial_writer = partial_upsert
+ let partial_writer = partial_upsert
.create_writer()
.expect("Failed to create UpsertWriter with partial write");
@@ -478,7 +478,7 @@ mod kv_table_test {
let table_upsert = table.new_upsert().expect("Failed to create
upsert");
- let mut upsert_writer = table_upsert
+ let upsert_writer = table_upsert
.create_writer()
.expect("Failed to create writer");
@@ -659,7 +659,7 @@ mod kv_table_test {
.expect("Failed to get table");
let table_upsert = table.new_upsert().expect("Failed to create
upsert");
- let mut upsert_writer = table_upsert
+ let upsert_writer = table_upsert
.create_writer()
.expect("Failed to create writer");