This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e1e7ba3  chore: cleanup and fix partial update in python (#292)
e1e7ba3 is described below

commit e1e7ba313ed738e842eec0fe55e396abd722639f
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 9 07:14:26 2026 +0000

    chore: cleanup and fix partial update in python (#292)
---
 bindings/python/example/example.py   |  43 ++++++++
 bindings/python/fluss/__init__.pyi   |   7 +-
 bindings/python/src/lib.rs           |   1 +
 bindings/python/src/lookup.rs        |   5 +-
 bindings/python/src/table.rs         | 195 +++++++++--------------------------
 bindings/python/src/upsert.rs        |  41 ++++++--
 crates/fluss/src/client/table/mod.rs |   1 +
 7 files changed, 128 insertions(+), 165 deletions(-)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 732b7df..9f8cafa 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -569,6 +569,49 @@ async def main():
         print(f"Error during delete: {e}")
         traceback.print_exc()
 
+    # --- Test Partial Update by column names ---
+    print("\n--- Testing Partial Update (by column names) ---")
+    try:
+        partial_writer = pk_table.new_upsert(columns=["user_id", "balance"])
+        handle = partial_writer.upsert({"user_id": 1, "balance": 
Decimal("9999.99")})
+        await handle.wait()
+        print("Partial update: set balance=9999.99 for user_id=1")
+
+        lookuper = pk_table.new_lookup()
+        result = await lookuper.lookup({"user_id": 1})
+        if result:
+            print(f"Partial update verified:"
+                  f"\n  name={result['name']} (unchanged)"
+                  f"\n  balance={result['balance']} (updated)")
+        else:
+            print("ERROR: Expected to find user_id=1")
+
+    except Exception as e:
+        print(f"Error during partial update by names: {e}")
+        traceback.print_exc()
+
+    # --- Test Partial Update by column indices ---
+    print("\n--- Testing Partial Update (by column indices) ---")
+    try:
+        # Columns: 0=user_id (PK), 1=name — update name only
+        partial_writer_idx = pk_table.new_upsert(column_indices=[0, 1])
+        handle = partial_writer_idx.upsert([1, "Alice Renamed"])
+        await handle.wait()
+        print("Partial update by indices: set name='Alice Renamed' for 
user_id=1")
+
+        lookuper = pk_table.new_lookup()
+        result = await lookuper.lookup({"user_id": 1})
+        if result:
+            print(f"Partial update by indices verified:"
+                  f"\n  name={result['name']} (updated)"
+                  f"\n  balance={result['balance']} (unchanged)")
+        else:
+            print("ERROR: Expected to find user_id=1")
+
+    except Exception as e:
+        print(f"Error during partial update by indices: {e}")
+        traceback.print_exc()
+
     # Demo: Column projection using builder pattern
     print("\n--- Testing Column Projection ---")
     try:
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index a9ef828..cc7053e 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -378,15 +378,14 @@ class AppendWriter:
             WriteResultHandle: Ignore for fire-and-forget, or await 
handle.wait() for acknowledgement.
 
         Supported Types:
-            Currently supports primitive types only:
             - Boolean, TinyInt, SmallInt, Int, BigInt (integers)
             - Float, Double (floating point)
             - String, Char (text)
             - Bytes, Binary (binary data)
+            - Date, Time, Timestamp, TimestampLTZ (temporal)
+            - Decimal (arbitrary precision)
             - Null values
 
-            Temporal types (Date, Timestamp, Decimal) are not yet supported.
-
         Example:
             writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
             writer.append([1, 'Alice', 95.5])
@@ -712,5 +711,7 @@ class OffsetType:
 
 # Constant for earliest offset (-2)
 EARLIEST_OFFSET: int
+# Constant for latest offset (-1)
+LATEST_OFFSET: int
 
 __version__: str
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 41f8de5..094dc00 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -96,6 +96,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
 
     // Register constants
     m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
+    m.add("LATEST_OFFSET", fcore::client::LATEST_OFFSET)?;
 
     // Register exception types
     m.add_class::<FlussError>()?;
diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs
index 8d91a61..e5c1f62 100644
--- a/bindings/python/src/lookup.rs
+++ b/bindings/python/src/lookup.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::table::{internal_row_to_dict, python_pk_to_generic_row};
+use crate::table::{internal_row_to_dict, python_to_sparse_generic_row};
 use crate::*;
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::Arc;
@@ -52,7 +52,8 @@ impl Lookuper {
         py: Python<'py>,
         pk: &Bound<'_, PyAny>,
     ) -> PyResult<Bound<'py, PyAny>> {
-        let generic_row = python_pk_to_generic_row(pk, &self.table_info)?;
+        let pk_indices = self.table_info.get_schema().primary_key_indexes();
+        let generic_row = python_to_sparse_generic_row(pk, &self.table_info, 
&pk_indices)?;
         let inner = self.inner.clone();
         let table_info = self.table_info.clone();
 
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index cb203dc..81acf00 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -639,124 +639,51 @@ enum RowInput<'py> {
     List(Bound<'py, pyo3::types::PyList>),
 }
 
-/// Helper function to process sequence types (list/tuple) into datums
-fn process_sequence_to_datums<'a, I>(
-    values: I,
-    len: usize,
-    fields: &[fcore::metadata::DataField],
-) -> PyResult<Vec<fcore::row::Datum<'static>>>
-where
-    I: Iterator<Item = Bound<'a, PyAny>>,
-{
-    if len != fields.len() {
-        return Err(FlussError::new_err(format!(
-            "Expected {} values, got {}",
-            fields.len(),
-            len
-        )));
-    }
-
-    let mut datums = Vec::with_capacity(fields.len());
-    for (i, (field, value)) in fields.iter().zip(values).enumerate() {
-        datums.push(
-            python_value_to_datum(&value, field.data_type()).map_err(|e| {
-                FlussError::new_err(format!("Field '{}' (index {}): {}", 
field.name(), i, e))
-            })?,
-        );
-    }
-    Ok(datums)
-}
-
-/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+/// Convert Python row (dict/list/tuple) to GenericRow requiring all schema 
columns.
 pub fn python_to_generic_row(
     row: &Bound<PyAny>,
     table_info: &fcore::metadata::TableInfo,
 ) -> PyResult<fcore::row::GenericRow<'static>> {
-    // Extract with user-friendly error message
-    let row_input: RowInput = row.extract().map_err(|_| {
-        let type_name = row
-            .get_type()
-            .name()
-            .map(|n| n.to_string())
-            .unwrap_or_else(|_| "unknown".to_string());
-        FlussError::new_err(format!(
-            "Row must be a dict, list, or tuple; got {type_name}"
-        ))
-    })?;
-    let schema = table_info.row_type();
-    let fields = schema.fields();
-
-    let datums = match row_input {
-        RowInput::Dict(dict) => {
-            // Strict: reject unknown keys (and also reject non-str keys 
nicely)
-            for (k, _) in dict.iter() {
-                let key_str = k.extract::<&str>().map_err(|_| {
-                    let key_type = k
-                        .get_type()
-                        .name()
-                        .map(|n| n.to_string())
-                        .unwrap_or_else(|_| "unknown".to_string());
-                    FlussError::new_err(format!("Row dict keys must be 
strings; got {key_type}"))
-                })?;
-
-                if fields.iter().all(|f| f.name() != key_str) {
-                    let expected = fields
-                        .iter()
-                        .map(|f| f.name())
-                        .collect::<Vec<_>>()
-                        .join(", ");
-                    return Err(FlussError::new_err(format!(
-                        "Unknown field '{key_str}'. Expected fields: 
{expected}"
-                    )));
-                }
-            }
-
-            let mut datums = Vec::with_capacity(fields.len());
-            for field in fields {
-                let value = dict.get_item(field.name())?.ok_or_else(|| {
-                    FlussError::new_err(format!("Missing field: {}", 
field.name()))
-                })?;
-                datums.push(
-                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
-                        FlussError::new_err(format!("Field '{}': {}", 
field.name(), e))
-                    })?,
-                );
-            }
-            datums
-        }
-
-        RowInput::List(list) => process_sequence_to_datums(list.iter(), 
list.len(), fields)?,
-
-        RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), 
tuple.len(), fields)?,
-    };
+    let all_indices: Vec<usize> = 
(0..table_info.row_type().fields().len()).collect();
+    python_to_sparse_generic_row(row, table_info, &all_indices)
+}
 
-    Ok(fcore::row::GenericRow { values: datums })
+/// Process a Python sequence (list or tuple) into datums at the target column 
positions.
+fn process_sequence(
+    seq: &Bound<pyo3::types::PySequence>,
+    target_indices: &[usize],
+    fields: &[fcore::metadata::DataField],
+    datums: &mut [fcore::row::Datum<'static>],
+) -> PyResult<()> {
+    if seq.len()? != target_indices.len() {
+        return Err(FlussError::new_err(format!(
+            "Expected {} elements, got {}",
+            target_indices.len(),
+            seq.len()?
+        )));
+    }
+    for (i, &col_idx) in target_indices.iter().enumerate() {
+        let field = &fields[col_idx];
+        let value = seq.get_item(i)?;
+        datums[col_idx] = python_value_to_datum(&value, field.data_type())
+            .map_err(|e| FlussError::new_err(format!("Field '{}': {}", 
field.name(), e)))?;
+    }
+    Ok(())
 }
 
-/// Convert Python primary key values (dict/list/tuple) to GenericRow.
-/// Only requires PK columns; non-PK columns are filled with Null.
-/// For dict: keys should be PK column names.
-/// For list/tuple: values should be PK values in PK column order.
-pub fn python_pk_to_generic_row(
+/// Build a full-width GenericRow filling only the specified column
+/// indices from user input; all other columns are set to Null.
+pub fn python_to_sparse_generic_row(
     row: &Bound<PyAny>,
     table_info: &fcore::metadata::TableInfo,
+    target_indices: &[usize],
 ) -> PyResult<fcore::row::GenericRow<'static>> {
-    let schema = table_info.get_schema();
     let row_type = table_info.row_type();
     let fields = row_type.fields();
-    let pk_indexes = schema.primary_key_indexes();
-    let pk_names: Vec<&str> = schema.primary_key_column_names();
-
-    if pk_indexes.is_empty() {
-        return Err(FlussError::new_err(
-            "Table has no primary key; cannot use PK-only row",
-        ));
-    }
+    let target_names: Vec<&str> = target_indices.iter().map(|&i| 
fields[i].name()).collect();
 
-    // Initialize all datums as Null
     let mut datums: Vec<fcore::row::Datum<'static>> = 
vec![fcore::row::Datum::Null; fields.len()];
 
-    // Extract with user-friendly error message
     let row_input: RowInput = row.extract().map_err(|_| {
         let type_name = row
             .get_type()
@@ -764,13 +691,12 @@ pub fn python_pk_to_generic_row(
             .map(|n| n.to_string())
             .unwrap_or_else(|_| "unknown".to_string());
         FlussError::new_err(format!(
-            "PK row must be a dict, list, or tuple; got {type_name}"
+            "Row must be a dict, list, or tuple; got {type_name}"
         ))
     })?;
 
     match row_input {
         RowInput::Dict(dict) => {
-            // Validate keys are PK columns
             for (k, _) in dict.iter() {
                 let key_str = k.extract::<&str>().map_err(|_| {
                     let key_type = k
@@ -778,64 +704,35 @@ pub fn python_pk_to_generic_row(
                         .name()
                         .map(|n| n.to_string())
                         .unwrap_or_else(|_| "unknown".to_string());
-                    FlussError::new_err(format!("PK dict keys must be strings; 
got {key_type}"))
+                    FlussError::new_err(format!("Dict keys must be strings; 
got {key_type}"))
                 })?;
-
-                if !pk_names.contains(&key_str) {
+                if !target_names.contains(&key_str) {
                     return Err(FlussError::new_err(format!(
-                        "Unknown PK field '{}'. Expected PK fields: {}",
+                        "Unknown field '{}'. Expected: {}",
                         key_str,
-                        pk_names.join(", ")
+                        target_names.join(", ")
                     )));
                 }
             }
-
-            // Extract PK values
-            for (i, pk_idx) in pk_indexes.iter().enumerate() {
-                let pk_name = pk_names[i];
-                let field: &fcore::metadata::DataField = &fields[*pk_idx];
+            for (i, &col_idx) in target_indices.iter().enumerate() {
+                let name = target_names[i];
+                let field = &fields[col_idx];
                 let value = dict
-                    .get_item(pk_name)?
-                    .ok_or_else(|| FlussError::new_err(format!("Missing PK 
field: {pk_name}")))?;
-                datums[*pk_idx] = python_value_to_datum(&value, 
field.data_type())
-                    .map_err(|e| FlussError::new_err(format!("PK field 
'{pk_name}': {e}")))?;
+                    .get_item(name)?
+                    .ok_or_else(|| FlussError::new_err(format!("Missing field: 
{name}")))?;
+                datums[col_idx] = python_value_to_datum(&value, 
field.data_type())
+                    .map_err(|e| FlussError::new_err(format!("Field '{name}': 
{e}")))?;
             }
         }
 
         RowInput::List(list) => {
-            if list.len() != pk_indexes.len() {
-                return Err(FlussError::new_err(format!(
-                    "PK list must have {} elements (PK columns), got {}",
-                    pk_indexes.len(),
-                    list.len()
-                )));
-            }
-            for (i, pk_idx) in pk_indexes.iter().enumerate() {
-                let field: &fcore::metadata::DataField = &fields[*pk_idx];
-                let value = list.get_item(i)?;
-                datums[*pk_idx] =
-                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
-                        FlussError::new_err(format!("PK field '{}': {}", 
field.name(), e))
-                    })?;
-            }
+            let seq = list.as_sequence();
+            process_sequence(seq, target_indices, fields, &mut datums)?;
         }
 
         RowInput::Tuple(tuple) => {
-            if tuple.len() != pk_indexes.len() {
-                return Err(FlussError::new_err(format!(
-                    "PK tuple must have {} elements (PK columns), got {}",
-                    pk_indexes.len(),
-                    tuple.len()
-                )));
-            }
-            for (i, pk_idx) in pk_indexes.iter().enumerate() {
-                let field: &fcore::metadata::DataField = &fields[*pk_idx];
-                let value = tuple.get_item(i)?;
-                datums[*pk_idx] =
-                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
-                        FlussError::new_err(format!("PK field '{}': {}", 
field.name(), e))
-                    })?;
-            }
+            let seq = tuple.as_sequence();
+            process_sequence(seq, target_indices, fields, &mut datums)?;
         }
     }
 
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
index 0aa69d7..745163e 100644
--- a/bindings/python/src/upsert.rs
+++ b/bindings/python/src/upsert.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::table::{python_pk_to_generic_row, python_to_generic_row};
+use crate::table::{python_to_generic_row, python_to_sparse_generic_row};
 use crate::*;
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::{Arc, Mutex};
@@ -46,6 +46,8 @@ struct UpsertWriterInner {
     /// Lazily initialized writer - created on first write operation
     writer: Mutex<Option<Arc<fcore::client::UpsertWriter>>>,
     table_info: fcore::metadata::TableInfo,
+    /// Column indices for partial updates (None = full row)
+    target_columns: Option<Vec<usize>>,
 }
 
 #[pymethods]
@@ -62,7 +64,11 @@ impl UpsertWriter {
     ///          For dict: keys are column names, values are column values.
     ///          For list/tuple: values must be in schema order.
     pub fn upsert(&self, row: &Bound<'_, PyAny>) -> 
PyResult<WriteResultHandle> {
-        let generic_row = python_to_generic_row(row, &self.inner.table_info)?;
+        let generic_row = if let Some(target_cols) = 
&self.inner.target_columns {
+            python_to_sparse_generic_row(row, &self.inner.table_info, 
target_cols)?
+        } else {
+            python_to_generic_row(row, &self.inner.table_info)?
+        };
 
         let writer = self.inner.get_or_create_writer()?;
         let result_future = writer
@@ -80,7 +86,8 @@ impl UpsertWriter {
     ///         For dict: keys are PK column names.
     ///         For list/tuple: values in PK column order.
     pub fn delete(&self, pk: &Bound<'_, PyAny>) -> PyResult<WriteResultHandle> 
{
-        let generic_row = python_pk_to_generic_row(pk, 
&self.inner.table_info)?;
+        let pk_indices = 
self.inner.table_info.get_schema().primary_key_indexes();
+        let generic_row = python_to_sparse_generic_row(pk, 
&self.inner.table_info, &pk_indices)?;
 
         let writer = self.inner.get_or_create_writer()?;
         let result_future = writer
@@ -134,15 +141,26 @@ impl UpsertWriter {
         columns: Option<Vec<String>>,
         column_indices: Option<Vec<usize>>,
     ) -> PyResult<Self> {
-        // Apply partial update configuration if specified
-        let table_upsert = if let Some(cols) = columns {
-            let col_refs: Vec<&str> = cols.iter().map(|s| 
s.as_str()).collect();
-            table_upsert
-                .partial_update_with_column_names(&col_refs)
-                .map_err(|e| FlussError::new_err(e.to_string()))?
-        } else if let Some(indices) = column_indices {
+        // Resolve target column indices (names → indices, or use provided 
indices directly)
+        let target_columns = if let Some(cols) = columns {
+            let row_type = table_info.row_type();
+            Some(
+                cols.iter()
+                    .map(|name| {
+                        row_type
+                            .get_field_index(name)
+                            .ok_or_else(|| 
FlussError::new_err(format!("Unknown column: {name}")))
+                    })
+                    .collect::<PyResult<Vec<usize>>>()?,
+            )
+        } else {
+            column_indices
+        };
+
+        // Apply partial update to the Rust core using resolved indices
+        let table_upsert = if let Some(ref indices) = target_columns {
             table_upsert
-                .partial_update(Some(indices))
+                .partial_update(Some(indices.clone()))
                 .map_err(|e| FlussError::new_err(e.to_string()))?
         } else {
             table_upsert
@@ -153,6 +171,7 @@ impl UpsertWriter {
                 table_upsert,
                 writer: Mutex::new(None),
                 table_info,
+                target_columns,
             }),
         })
     }
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 37e9b45..62f51b4 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -22,6 +22,7 @@ use crate::metadata::{TableInfo, TablePath};
 use std::sync::Arc;
 
 pub const EARLIEST_OFFSET: i64 = -2;
+pub const LATEST_OFFSET: i64 = -1;
 
 mod append;
 mod lookup;

Reply via email to