This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e1e7ba3 chore: cleanup and fix partial update in python (#292)
e1e7ba3 is described below
commit e1e7ba313ed738e842eec0fe55e396abd722639f
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 9 07:14:26 2026 +0000
chore: cleanup and fix partial update in python (#292)
---
bindings/python/example/example.py | 43 ++++++++
bindings/python/fluss/__init__.pyi | 7 +-
bindings/python/src/lib.rs | 1 +
bindings/python/src/lookup.rs | 5 +-
bindings/python/src/table.rs | 195 +++++++++--------------------------
bindings/python/src/upsert.rs | 41 ++++++--
crates/fluss/src/client/table/mod.rs | 1 +
7 files changed, 128 insertions(+), 165 deletions(-)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 732b7df..9f8cafa 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -569,6 +569,49 @@ async def main():
print(f"Error during delete: {e}")
traceback.print_exc()
+ # --- Test Partial Update by column names ---
+ print("\n--- Testing Partial Update (by column names) ---")
+ try:
+ partial_writer = pk_table.new_upsert(columns=["user_id", "balance"])
+ handle = partial_writer.upsert({"user_id": 1, "balance":
Decimal("9999.99")})
+ await handle.wait()
+ print("Partial update: set balance=9999.99 for user_id=1")
+
+ lookuper = pk_table.new_lookup()
+ result = await lookuper.lookup({"user_id": 1})
+ if result:
+ print(f"Partial update verified:"
+ f"\n name={result['name']} (unchanged)"
+ f"\n balance={result['balance']} (updated)")
+ else:
+ print("ERROR: Expected to find user_id=1")
+
+ except Exception as e:
+ print(f"Error during partial update by names: {e}")
+ traceback.print_exc()
+
+ # --- Test Partial Update by column indices ---
+ print("\n--- Testing Partial Update (by column indices) ---")
+ try:
+ # Columns: 0=user_id (PK), 1=name — update name only
+ partial_writer_idx = pk_table.new_upsert(column_indices=[0, 1])
+ handle = partial_writer_idx.upsert([1, "Alice Renamed"])
+ await handle.wait()
+ print("Partial update by indices: set name='Alice Renamed' for
user_id=1")
+
+ lookuper = pk_table.new_lookup()
+ result = await lookuper.lookup({"user_id": 1})
+ if result:
+ print(f"Partial update by indices verified:"
+ f"\n name={result['name']} (updated)"
+ f"\n balance={result['balance']} (unchanged)")
+ else:
+ print("ERROR: Expected to find user_id=1")
+
+ except Exception as e:
+ print(f"Error during partial update by indices: {e}")
+ traceback.print_exc()
+
# Demo: Column projection using builder pattern
print("\n--- Testing Column Projection ---")
try:
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index a9ef828..cc7053e 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -378,15 +378,14 @@ class AppendWriter:
WriteResultHandle: Ignore for fire-and-forget, or await
handle.wait() for acknowledgement.
Supported Types:
- Currently supports primitive types only:
- Boolean, TinyInt, SmallInt, Int, BigInt (integers)
- Float, Double (floating point)
- String, Char (text)
- Bytes, Binary (binary data)
+ - Date, Time, Timestamp, TimestampLTZ (temporal)
+ - Decimal (arbitrary precision)
- Null values
- Temporal types (Date, Timestamp, Decimal) are not yet supported.
-
Example:
writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
writer.append([1, 'Alice', 95.5])
@@ -712,5 +711,7 @@ class OffsetType:
# Constant for earliest offset (-2)
EARLIEST_OFFSET: int
+# Constant for latest offset (-1)
+LATEST_OFFSET: int
__version__: str
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 41f8de5..094dc00 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -96,6 +96,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
// Register constants
m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
+ m.add("LATEST_OFFSET", fcore::client::LATEST_OFFSET)?;
// Register exception types
m.add_class::<FlussError>()?;
diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs
index 8d91a61..e5c1f62 100644
--- a/bindings/python/src/lookup.rs
+++ b/bindings/python/src/lookup.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::table::{internal_row_to_dict, python_pk_to_generic_row};
+use crate::table::{internal_row_to_dict, python_to_sparse_generic_row};
use crate::*;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
@@ -52,7 +52,8 @@ impl Lookuper {
py: Python<'py>,
pk: &Bound<'_, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
- let generic_row = python_pk_to_generic_row(pk, &self.table_info)?;
+ let pk_indices = self.table_info.get_schema().primary_key_indexes();
+ let generic_row = python_to_sparse_generic_row(pk, &self.table_info,
&pk_indices)?;
let inner = self.inner.clone();
let table_info = self.table_info.clone();
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index cb203dc..81acf00 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -639,124 +639,51 @@ enum RowInput<'py> {
List(Bound<'py, pyo3::types::PyList>),
}
-/// Helper function to process sequence types (list/tuple) into datums
-fn process_sequence_to_datums<'a, I>(
- values: I,
- len: usize,
- fields: &[fcore::metadata::DataField],
-) -> PyResult<Vec<fcore::row::Datum<'static>>>
-where
- I: Iterator<Item = Bound<'a, PyAny>>,
-{
- if len != fields.len() {
- return Err(FlussError::new_err(format!(
- "Expected {} values, got {}",
- fields.len(),
- len
- )));
- }
-
- let mut datums = Vec::with_capacity(fields.len());
- for (i, (field, value)) in fields.iter().zip(values).enumerate() {
- datums.push(
- python_value_to_datum(&value, field.data_type()).map_err(|e| {
- FlussError::new_err(format!("Field '{}' (index {}): {}",
field.name(), i, e))
- })?,
- );
- }
- Ok(datums)
-}
-
-/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+/// Convert Python row (dict/list/tuple) to GenericRow requiring all schema
columns.
pub fn python_to_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
) -> PyResult<fcore::row::GenericRow<'static>> {
- // Extract with user-friendly error message
- let row_input: RowInput = row.extract().map_err(|_| {
- let type_name = row
- .get_type()
- .name()
- .map(|n| n.to_string())
- .unwrap_or_else(|_| "unknown".to_string());
- FlussError::new_err(format!(
- "Row must be a dict, list, or tuple; got {type_name}"
- ))
- })?;
- let schema = table_info.row_type();
- let fields = schema.fields();
-
- let datums = match row_input {
- RowInput::Dict(dict) => {
- // Strict: reject unknown keys (and also reject non-str keys
nicely)
- for (k, _) in dict.iter() {
- let key_str = k.extract::<&str>().map_err(|_| {
- let key_type = k
- .get_type()
- .name()
- .map(|n| n.to_string())
- .unwrap_or_else(|_| "unknown".to_string());
- FlussError::new_err(format!("Row dict keys must be
strings; got {key_type}"))
- })?;
-
- if fields.iter().all(|f| f.name() != key_str) {
- let expected = fields
- .iter()
- .map(|f| f.name())
- .collect::<Vec<_>>()
- .join(", ");
- return Err(FlussError::new_err(format!(
- "Unknown field '{key_str}'. Expected fields:
{expected}"
- )));
- }
- }
-
- let mut datums = Vec::with_capacity(fields.len());
- for field in fields {
- let value = dict.get_item(field.name())?.ok_or_else(|| {
- FlussError::new_err(format!("Missing field: {}",
field.name()))
- })?;
- datums.push(
- python_value_to_datum(&value,
field.data_type()).map_err(|e| {
- FlussError::new_err(format!("Field '{}': {}",
field.name(), e))
- })?,
- );
- }
- datums
- }
-
- RowInput::List(list) => process_sequence_to_datums(list.iter(),
list.len(), fields)?,
-
- RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(),
tuple.len(), fields)?,
- };
+ let all_indices: Vec<usize> =
(0..table_info.row_type().fields().len()).collect();
+ python_to_sparse_generic_row(row, table_info, &all_indices)
+}
- Ok(fcore::row::GenericRow { values: datums })
+/// Process a Python sequence (list or tuple) into datums at the target column
positions.
+fn process_sequence(
+ seq: &Bound<pyo3::types::PySequence>,
+ target_indices: &[usize],
+ fields: &[fcore::metadata::DataField],
+ datums: &mut [fcore::row::Datum<'static>],
+) -> PyResult<()> {
+ if seq.len()? != target_indices.len() {
+ return Err(FlussError::new_err(format!(
+ "Expected {} elements, got {}",
+ target_indices.len(),
+ seq.len()?
+ )));
+ }
+ for (i, &col_idx) in target_indices.iter().enumerate() {
+ let field = &fields[col_idx];
+ let value = seq.get_item(i)?;
+ datums[col_idx] = python_value_to_datum(&value, field.data_type())
+ .map_err(|e| FlussError::new_err(format!("Field '{}': {}",
field.name(), e)))?;
+ }
+ Ok(())
}
-/// Convert Python primary key values (dict/list/tuple) to GenericRow.
-/// Only requires PK columns; non-PK columns are filled with Null.
-/// For dict: keys should be PK column names.
-/// For list/tuple: values should be PK values in PK column order.
-pub fn python_pk_to_generic_row(
+/// Build a full-width GenericRow filling only the specified column
+/// indices from user input; all other columns are set to Null.
+pub fn python_to_sparse_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
+ target_indices: &[usize],
) -> PyResult<fcore::row::GenericRow<'static>> {
- let schema = table_info.get_schema();
let row_type = table_info.row_type();
let fields = row_type.fields();
- let pk_indexes = schema.primary_key_indexes();
- let pk_names: Vec<&str> = schema.primary_key_column_names();
-
- if pk_indexes.is_empty() {
- return Err(FlussError::new_err(
- "Table has no primary key; cannot use PK-only row",
- ));
- }
+ let target_names: Vec<&str> = target_indices.iter().map(|&i|
fields[i].name()).collect();
- // Initialize all datums as Null
let mut datums: Vec<fcore::row::Datum<'static>> =
vec![fcore::row::Datum::Null; fields.len()];
- // Extract with user-friendly error message
let row_input: RowInput = row.extract().map_err(|_| {
let type_name = row
.get_type()
@@ -764,13 +691,12 @@ pub fn python_pk_to_generic_row(
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
FlussError::new_err(format!(
- "PK row must be a dict, list, or tuple; got {type_name}"
+ "Row must be a dict, list, or tuple; got {type_name}"
))
})?;
match row_input {
RowInput::Dict(dict) => {
- // Validate keys are PK columns
for (k, _) in dict.iter() {
let key_str = k.extract::<&str>().map_err(|_| {
let key_type = k
@@ -778,64 +704,35 @@ pub fn python_pk_to_generic_row(
.name()
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
- FlussError::new_err(format!("PK dict keys must be strings;
got {key_type}"))
+ FlussError::new_err(format!("Dict keys must be strings;
got {key_type}"))
})?;
-
- if !pk_names.contains(&key_str) {
+ if !target_names.contains(&key_str) {
return Err(FlussError::new_err(format!(
- "Unknown PK field '{}'. Expected PK fields: {}",
+ "Unknown field '{}'. Expected: {}",
key_str,
- pk_names.join(", ")
+ target_names.join(", ")
)));
}
}
-
- // Extract PK values
- for (i, pk_idx) in pk_indexes.iter().enumerate() {
- let pk_name = pk_names[i];
- let field: &fcore::metadata::DataField = &fields[*pk_idx];
+ for (i, &col_idx) in target_indices.iter().enumerate() {
+ let name = target_names[i];
+ let field = &fields[col_idx];
let value = dict
- .get_item(pk_name)?
- .ok_or_else(|| FlussError::new_err(format!("Missing PK
field: {pk_name}")))?;
- datums[*pk_idx] = python_value_to_datum(&value,
field.data_type())
- .map_err(|e| FlussError::new_err(format!("PK field
'{pk_name}': {e}")))?;
+ .get_item(name)?
+ .ok_or_else(|| FlussError::new_err(format!("Missing field:
{name}")))?;
+ datums[col_idx] = python_value_to_datum(&value,
field.data_type())
+ .map_err(|e| FlussError::new_err(format!("Field '{name}':
{e}")))?;
}
}
RowInput::List(list) => {
- if list.len() != pk_indexes.len() {
- return Err(FlussError::new_err(format!(
- "PK list must have {} elements (PK columns), got {}",
- pk_indexes.len(),
- list.len()
- )));
- }
- for (i, pk_idx) in pk_indexes.iter().enumerate() {
- let field: &fcore::metadata::DataField = &fields[*pk_idx];
- let value = list.get_item(i)?;
- datums[*pk_idx] =
- python_value_to_datum(&value,
field.data_type()).map_err(|e| {
- FlussError::new_err(format!("PK field '{}': {}",
field.name(), e))
- })?;
- }
+ let seq = list.as_sequence();
+ process_sequence(seq, target_indices, fields, &mut datums)?;
}
RowInput::Tuple(tuple) => {
- if tuple.len() != pk_indexes.len() {
- return Err(FlussError::new_err(format!(
- "PK tuple must have {} elements (PK columns), got {}",
- pk_indexes.len(),
- tuple.len()
- )));
- }
- for (i, pk_idx) in pk_indexes.iter().enumerate() {
- let field: &fcore::metadata::DataField = &fields[*pk_idx];
- let value = tuple.get_item(i)?;
- datums[*pk_idx] =
- python_value_to_datum(&value,
field.data_type()).map_err(|e| {
- FlussError::new_err(format!("PK field '{}': {}",
field.name(), e))
- })?;
- }
+ let seq = tuple.as_sequence();
+ process_sequence(seq, target_indices, fields, &mut datums)?;
}
}
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
index 0aa69d7..745163e 100644
--- a/bindings/python/src/upsert.rs
+++ b/bindings/python/src/upsert.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::table::{python_pk_to_generic_row, python_to_generic_row};
+use crate::table::{python_to_generic_row, python_to_sparse_generic_row};
use crate::*;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::{Arc, Mutex};
@@ -46,6 +46,8 @@ struct UpsertWriterInner {
/// Lazily initialized writer - created on first write operation
writer: Mutex<Option<Arc<fcore::client::UpsertWriter>>>,
table_info: fcore::metadata::TableInfo,
+ /// Column indices for partial updates (None = full row)
+ target_columns: Option<Vec<usize>>,
}
#[pymethods]
@@ -62,7 +64,11 @@ impl UpsertWriter {
/// For dict: keys are column names, values are column values.
/// For list/tuple: values must be in schema order.
pub fn upsert(&self, row: &Bound<'_, PyAny>) ->
PyResult<WriteResultHandle> {
- let generic_row = python_to_generic_row(row, &self.inner.table_info)?;
+ let generic_row = if let Some(target_cols) =
&self.inner.target_columns {
+ python_to_sparse_generic_row(row, &self.inner.table_info,
target_cols)?
+ } else {
+ python_to_generic_row(row, &self.inner.table_info)?
+ };
let writer = self.inner.get_or_create_writer()?;
let result_future = writer
@@ -80,7 +86,8 @@ impl UpsertWriter {
/// For dict: keys are PK column names.
/// For list/tuple: values in PK column order.
pub fn delete(&self, pk: &Bound<'_, PyAny>) -> PyResult<WriteResultHandle>
{
- let generic_row = python_pk_to_generic_row(pk,
&self.inner.table_info)?;
+ let pk_indices =
self.inner.table_info.get_schema().primary_key_indexes();
+ let generic_row = python_to_sparse_generic_row(pk,
&self.inner.table_info, &pk_indices)?;
let writer = self.inner.get_or_create_writer()?;
let result_future = writer
@@ -134,15 +141,26 @@ impl UpsertWriter {
columns: Option<Vec<String>>,
column_indices: Option<Vec<usize>>,
) -> PyResult<Self> {
- // Apply partial update configuration if specified
- let table_upsert = if let Some(cols) = columns {
- let col_refs: Vec<&str> = cols.iter().map(|s|
s.as_str()).collect();
- table_upsert
- .partial_update_with_column_names(&col_refs)
- .map_err(|e| FlussError::new_err(e.to_string()))?
- } else if let Some(indices) = column_indices {
+ // Resolve target column indices (names → indices, or use provided
indices directly)
+ let target_columns = if let Some(cols) = columns {
+ let row_type = table_info.row_type();
+ Some(
+ cols.iter()
+ .map(|name| {
+ row_type
+ .get_field_index(name)
+ .ok_or_else(||
FlussError::new_err(format!("Unknown column: {name}")))
+ })
+ .collect::<PyResult<Vec<usize>>>()?,
+ )
+ } else {
+ column_indices
+ };
+
+ // Apply partial update to the Rust core using resolved indices
+ let table_upsert = if let Some(ref indices) = target_columns {
table_upsert
- .partial_update(Some(indices))
+ .partial_update(Some(indices.clone()))
.map_err(|e| FlussError::new_err(e.to_string()))?
} else {
table_upsert
@@ -153,6 +171,7 @@ impl UpsertWriter {
table_upsert,
writer: Mutex::new(None),
table_info,
+ target_columns,
}),
})
}
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 37e9b45..62f51b4 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -22,6 +22,7 @@ use crate::metadata::{TableInfo, TablePath};
use std::sync::Arc;
pub const EARLIEST_OFFSET: i64 = -2;
+pub const LATEST_OFFSET: i64 = -1;
mod append;
mod lookup;