This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 31d5df5  feat: introduce kv tables support in python (#239)
31d5df5 is described below

commit 31d5df5d6d62cf5a7b208e8c8a03bee9730e5826
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 4 15:15:26 2026 +0000

    feat: introduce kv tables support in python (#239)
---
 bindings/python/example/example.py       | 305 +++++++++++++++++++---
 bindings/python/fluss/__init__.pyi       |  81 +++++-
 bindings/python/src/lib.rs               |   6 +
 bindings/python/src/lookup.rs            | 111 ++++++++
 bindings/python/src/table.rs             | 417 ++++++++++++++++++++++++++++++-
 bindings/python/src/upsert.rs            | 188 ++++++++++++++
 crates/fluss/src/client/table/lookup.rs  |  50 ++--
 crates/fluss/src/client/table/mod.rs     |   4 +-
 crates/fluss/src/client/table/scanner.rs |   3 +-
 crates/fluss/src/row/encode/mod.rs       |   4 +-
 10 files changed, 1101 insertions(+), 68 deletions(-)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 5d0302e..c359425 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -16,8 +16,9 @@
 # under the License.
 
 import asyncio
-import time
-from datetime import date, time as dt_time, datetime
+import traceback
+from datetime import date, datetime
+from datetime import time as dt_time
 from decimal import Decimal
 
 import pandas as pd
@@ -103,11 +104,34 @@ async def main():
                 pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
                 pa.array([95.2, 87.2, 92.1], type=pa.float32()),
                 pa.array([25, 30, 35], type=pa.int32()),
-                pa.array([date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 
8)], type=pa.date32()),
-                pa.array([dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 
0)], type=pa.time32("ms")),
-                pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us")),
-                pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us", tz="UTC")),
-                pa.array([Decimal("75000.00"), Decimal("82000.50"), 
Decimal("95000.75")], type=pa.decimal128(10, 2)),
+                pa.array(
+                    [date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)],
+                    type=pa.date32(),
+                ),
+                pa.array(
+                    [dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)],
+                    type=pa.time32("ms"),
+                ),
+                pa.array(
+                    [
+                        datetime(2024, 1, 15, 10, 30),
+                        datetime(2024, 1, 15, 11, 0),
+                        datetime(2024, 1, 15, 11, 30),
+                    ],
+                    type=pa.timestamp("us"),
+                ),
+                pa.array(
+                    [
+                        datetime(2024, 1, 15, 10, 30),
+                        datetime(2024, 1, 15, 11, 0),
+                        datetime(2024, 1, 15, 11, 30),
+                    ],
+                    type=pa.timestamp("us", tz="UTC"),
+                ),
+                pa.array(
+                    [Decimal("75000.00"), Decimal("82000.50"), 
Decimal("95000.75")],
+                    type=pa.decimal128(10, 2),
+                ),
             ],
             schema=schema,
         )
@@ -125,9 +149,18 @@ async def main():
                 pa.array([28, 32], type=pa.int32()),
                 pa.array([date(1996, 7, 22), date(1992, 12, 1)], 
type=pa.date32()),
                 pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)], 
type=pa.time32("ms")),
-                pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 
9, 30)], type=pa.timestamp("us")),
-                pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 
9, 30)], type=pa.timestamp("us", tz="UTC")),
-                pa.array([Decimal("68000.00"), Decimal("72500.25")], 
type=pa.decimal128(10, 2)),
+                pa.array(
+                    [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 
30)],
+                    type=pa.timestamp("us"),
+                ),
+                pa.array(
+                    [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 
30)],
+                    type=pa.timestamp("us", tz="UTC"),
+                ),
+                pa.array(
+                    [Decimal("68000.00"), Decimal("72500.25")],
+                    type=pa.decimal128(10, 2),
+                ),
             ],
             schema=schema,
         )
@@ -138,28 +171,35 @@ async def main():
         # Test 3: Append single rows with Date, Time, Timestamp, Decimal
         print("\n--- Testing single row append with temporal/decimal types 
---")
         # Dict input with all types including Date, Time, Timestamp, Decimal
-        await append_writer.append({
-            "id": 8,
-            "name": "Helen",
-            "score": 93.5,
-            "age": 26,
-            "birth_date": date(1998, 4, 10),
-            "check_in_time": dt_time(11, 30, 45),
-            "created_at": datetime(2024, 1, 17, 14, 0, 0),
-            "updated_at": datetime(2024, 1, 17, 14, 0, 0),
-            "salary": Decimal("88000.00"),
-        })
+        await append_writer.append(
+            {
+                "id": 8,
+                "name": "Helen",
+                "score": 93.5,
+                "age": 26,
+                "birth_date": date(1998, 4, 10),
+                "check_in_time": dt_time(11, 30, 45),
+                "created_at": datetime(2024, 1, 17, 14, 0, 0),
+                "updated_at": datetime(2024, 1, 17, 14, 0, 0),
+                "salary": Decimal("88000.00"),
+            }
+        )
         print("Successfully appended row (dict with Date, Time, Timestamp, 
Decimal)")
 
         # List input with all types
-        await append_writer.append([
-            9, "Ivan", 90.0, 31,
-            date(1993, 8, 25),
-            dt_time(16, 45, 0),
-            datetime(2024, 1, 17, 15, 30, 0),
-            datetime(2024, 1, 17, 15, 30, 0),
-            Decimal("91500.50"),
-        ])
+        await append_writer.append(
+            [
+                9,
+                "Ivan",
+                90.0,
+                31,
+                date(1993, 8, 25),
+                dt_time(16, 45, 0),
+                datetime(2024, 1, 17, 15, 30, 0),
+                datetime(2024, 1, 17, 15, 30, 0),
+                Decimal("91500.50"),
+            ]
+        )
         print("Successfully appended row (list with Date, Time, Timestamp, 
Decimal)")
 
         # Test 4: Write Pandas DataFrame
@@ -172,8 +212,14 @@ async def main():
                 "age": [29, 27],
                 "birth_date": [date(1995, 2, 14), date(1997, 9, 30)],
                 "check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)],
-                "created_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 
18, 8, 30)],
-                "updated_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 
18, 8, 30)],
+                "created_at": [
+                    datetime(2024, 1, 18, 8, 0),
+                    datetime(2024, 1, 18, 8, 30),
+                ],
+                "updated_at": [
+                    datetime(2024, 1, 18, 8, 0),
+                    datetime(2024, 1, 18, 8, 30),
+                ],
                 "salary": [Decimal("79000.00"), Decimal("85500.75")],
             }
         )
@@ -249,6 +295,199 @@ async def main():
     except Exception as e:
         print(f"Error during scanning: {e}")
 
+    # =====================================================
+    # Demo: Primary Key Table with Lookup and Upsert
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Primary Key Table (Lookup & Upsert) ---")
+    print("=" * 60)
+
+    # Create a primary key table for lookup/upsert tests
+    # Include temporal and decimal types to test full conversion
+    pk_table_fields = [
+        pa.field("user_id", pa.int32()),
+        pa.field("name", pa.string()),
+        pa.field("email", pa.string()),
+        pa.field("age", pa.int32()),
+        pa.field("birth_date", pa.date32()),
+        pa.field("login_time", pa.time32("ms")),
+        pa.field("created_at", pa.timestamp("us")),  # TIMESTAMP (NTZ)
+        pa.field("updated_at", pa.timestamp("us", tz="UTC")),  # TIMESTAMP_LTZ
+        pa.field("balance", pa.decimal128(10, 2)),
+    ]
+    pk_schema = pa.schema(pk_table_fields)
+    fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"])
+
+    # Create table descriptor
+    pk_table_descriptor = fluss.TableDescriptor(
+        fluss_pk_schema,
+        bucket_count=3,
+    )
+
+    pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3")
+
+    try:
+        await admin.create_table(pk_table_path, pk_table_descriptor, True)
+        print(f"Created PK table: {pk_table_path}")
+    except Exception as e:
+        print(f"PK Table creation failed (may already exist): {e}")
+
+    # Get the PK table
+    pk_table = await conn.get_table(pk_table_path)
+    print(f"Got PK table: {pk_table}")
+    print(f"Has primary key: {pk_table.has_primary_key()}")
+
+    # --- Test Upsert ---
+    print("\n--- Testing Upsert ---")
+    try:
+        upsert_writer = pk_table.new_upsert()
+        print(f"Created upsert writer: {upsert_writer}")
+
+        await upsert_writer.upsert(
+            {
+                "user_id": 1,
+                "name": "Alice",
+                "email": "[email protected]",
+                "age": 25,
+                "birth_date": date(1999, 5, 15),
+                "login_time": dt_time(9, 30, 45, 123000),  # 09:30:45.123
+                "created_at": datetime(
+                    2024, 1, 15, 10, 30, 45, 123456
+                ),  # with microseconds
+                "updated_at": datetime(2024, 1, 15, 10, 30, 45, 123456),
+                "balance": Decimal("1234.56"),
+            }
+        )
+        print("Upserted user_id=1 (Alice)")
+
+        await upsert_writer.upsert(
+            {
+                "user_id": 2,
+                "name": "Bob",
+                "email": "[email protected]",
+                "age": 30,
+                "birth_date": date(1994, 3, 20),
+                "login_time": dt_time(14, 15, 30, 500000),  # 14:15:30.500
+                "created_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
+                "updated_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
+                "balance": Decimal("5678.91"),
+            }
+        )
+        print("Upserted user_id=2 (Bob)")
+
+        await upsert_writer.upsert(
+            {
+                "user_id": 3,
+                "name": "Charlie",
+                "email": "[email protected]",
+                "age": 35,
+                "birth_date": date(1989, 11, 8),
+                "login_time": dt_time(16, 45, 59, 999000),  # 16:45:59.999
+                "created_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
+                "updated_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
+                "balance": Decimal("9876.54"),
+            }
+        )
+        print("Upserted user_id=3 (Charlie)")
+
+        # Update an existing row (same PK, different values)
+        await upsert_writer.upsert(
+            {
+                "user_id": 1,
+                "name": "Alice Updated",
+                "email": "[email protected]",
+                "age": 26,
+                "birth_date": date(1999, 5, 15),
+                "login_time": dt_time(10, 11, 12, 345000),  # 10:11:12.345
+                "created_at": datetime(2024, 1, 15, 10, 30, 45, 123456),  # 
unchanged
+                "updated_at": datetime(
+                    2024, 1, 20, 15, 45, 30, 678901
+                ),  # new update time
+                "balance": Decimal("2345.67"),
+            }
+        )
+        print("Updated user_id=1 (Alice -> Alice Updated)")
+
+        # Explicit flush to ensure all upserts are acknowledged
+        await upsert_writer.flush()
+        print("Flushed all upserts")
+
+    except Exception as e:
+        print(f"Error during upsert: {e}")
+        traceback.print_exc()
+
+    # --- Test Lookup ---
+    print("\n--- Testing Lookup ---")
+    try:
+        lookuper = pk_table.new_lookup()
+        print(f"Created lookuper: {lookuper}")
+
+        result = await lookuper.lookup({"user_id": 1})
+        if result:
+            print("Lookup user_id=1: Found!")
+            print(f"  name: {result['name']}")
+            print(f"  email: {result['email']}")
+            print(f"  age: {result['age']}")
+            print(
+                f"  birth_date: {result['birth_date']} (type: 
{type(result['birth_date']).__name__})"
+            )
+            print(
+                f"  login_time: {result['login_time']} (type: 
{type(result['login_time']).__name__})"
+            )
+            print(
+                f"  created_at: {result['created_at']} (type: 
{type(result['created_at']).__name__})"
+            )
+            print(
+                f"  updated_at: {result['updated_at']} (type: 
{type(result['updated_at']).__name__})"
+            )
+            print(
+                f"  balance: {result['balance']} (type: 
{type(result['balance']).__name__})"
+            )
+        else:
+            print("Lookup user_id=1: Not found")
+
+        # Lookup another row
+        result = await lookuper.lookup({"user_id": 2})
+        if result:
+            print(f"Lookup user_id=2: Found! -> {result}")
+        else:
+            print("Lookup user_id=2: Not found")
+
+        # Lookup non-existent row
+        result = await lookuper.lookup({"user_id": 999})
+        if result:
+            print(f"Lookup user_id=999: Found! -> {result}")
+        else:
+            print("Lookup user_id=999: Not found (as expected)")
+
+    except Exception as e:
+        print(f"Error during lookup: {e}")
+        traceback.print_exc()
+
+    # --- Test Delete ---
+    print("\n--- Testing Delete ---")
+    try:
+        upsert_writer = pk_table.new_upsert()
+
+        # Delete only needs PK columns - much simpler API!
+        await upsert_writer.delete({"user_id": 3})
+        print("Deleted user_id=3")
+
+        # Explicit flush to ensure delete is acknowledged
+        await upsert_writer.flush()
+        print("Flushed delete")
+
+        lookuper = pk_table.new_lookup()
+        result = await lookuper.lookup({"user_id": 3})
+        if result:
+            print(f"Lookup user_id=3 after delete: Still found! -> {result}")
+        else:
+            print("Lookup user_id=3 after delete: Not found (deletion 
confirmed)")
+
+    except Exception as e:
+        print(f"Error during delete: {e}")
+        traceback.print_exc()
+
     # Demo: Column projection
     print("\n--- Testing Column Projection ---")
     try:
@@ -258,7 +497,9 @@ async def main():
         scanner_index.subscribe(None, None)
         df_projected = scanner_index.to_pandas()
         print(df_projected.head())
-        print(f"   Projected {df_projected.shape[1]} columns: 
{list(df_projected.columns)}")
+        print(
+            f"   Projected {df_projected.shape[1]} columns: 
{list(df_projected.columns)}"
+        )
 
         # Project specific columns by name (Pythonic!)
         print("\n2. Projection by name ['name', 'score'] (Pythonic):")
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 6073070..c911ebe 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -45,7 +45,12 @@ class FlussConnection:
     async def get_table(self, table_path: TablePath) -> FlussTable: ...
     def close(self) -> None: ...
     def __enter__(self) -> FlussConnection: ...
-    def __exit__(self, exc_type: Optional[type], exc_value: 
Optional[BaseException], traceback: Optional[TracebackType]) -> bool: ...
+    def __exit__(
+        self,
+        exc_type: Optional[type],
+        exc_value: Optional[BaseException],
+        traceback: Optional[TracebackType],
+    ) -> bool: ...
     def __repr__(self) -> str: ...
 
 class FlussAdmin:
@@ -61,7 +66,17 @@ class FlussAdmin:
 
 class FlussTable:
     async def new_append_writer(self) -> AppendWriter: ...
-    async def new_log_scanner(self) -> LogScanner: ...
+    async def new_log_scanner(
+        self,
+        project: Optional[List[int]] = None,
+        columns: Optional[List[str]] = None,
+    ) -> LogScanner: ...
+    def new_upsert(
+        self,
+        columns: Optional[List[str]] = None,
+        column_indices: Optional[List[int]] = None,
+    ) -> UpsertWriter: ...
+    def new_lookup(self) -> Lookuper: ...
     def get_table_info(self) -> TableInfo: ...
     def get_table_path(self) -> TablePath: ...
     def has_primary_key(self) -> bool: ...
@@ -100,6 +115,49 @@ class AppendWriter:
     def flush(self) -> None: ...
     def __repr__(self) -> str: ...
 
+class UpsertWriter:
+    """Writer for upserting and deleting data in a Fluss primary key table."""
+
+    async def upsert(self, row: dict | list | tuple) -> None:
+        """Upsert a row into the table.
+
+        If a row with the same primary key exists, it will be updated.
+        Otherwise, a new row will be inserted.
+
+        Args:
+            row: Dictionary mapping field names to values, or
+                 list/tuple of values in schema order
+        """
+        ...
+    async def delete(self, pk: dict | list | tuple) -> None:
+        """Delete a row from the table by primary key.
+
+        Args:
+            pk: Dictionary with PK column names as keys, or
+                list/tuple of PK values in PK column order
+        """
+        ...
+    async def flush(self) -> None:
+        """Flush all pending upsert/delete operations to the server."""
+        ...
+    def __repr__(self) -> str: ...
+
+class Lookuper:
+    """Lookuper for performing primary key lookups on a Fluss table."""
+
+    async def lookup(self, pk: dict | list | tuple) -> Optional[Dict[str, 
object]]:
+        """Lookup a row by its primary key.
+
+        Args:
+            pk: Dictionary with PK column names as keys, or
+                list/tuple of PK values in PK column order
+
+        Returns:
+            A dict containing the row data if found, None otherwise.
+        """
+        ...
+    def __repr__(self) -> str: ...
+
 class LogScanner:
     def subscribe(
         self, start_timestamp: Optional[int], end_timestamp: Optional[int]
@@ -109,14 +167,27 @@ class LogScanner:
     def __repr__(self) -> str: ...
 
 class Schema:
-    def __init__(self, schema: pa.Schema, primary_keys: Optional[List[str]] = 
None) -> None: ...
+    def __init__(
+        self, schema: pa.Schema, primary_keys: Optional[List[str]] = None
+    ) -> None: ...
     def get_column_names(self) -> List[str]: ...
     def get_column_types(self) -> List[str]: ...
-    def get_columns(self) -> List[Tuple[str,str]]: ...
+    def get_columns(self) -> List[Tuple[str, str]]: ...
     def __str__(self) -> str: ...
 
 class TableDescriptor:
-    def __init__(self, schema: Schema, **kwargs: str) -> None: ...
+    def __init__(
+        self,
+        schema: Schema,
+        *,
+        partition_keys: Optional[List[str]] = None,
+        bucket_count: Optional[int] = None,
+        bucket_keys: Optional[List[str]] = None,
+        comment: Optional[str] = None,
+        log_format: Optional[str] = None,
+        kv_format: Optional[str] = None,
+        **properties: str,
+    ) -> None: ...
     def get_schema(self) -> Schema: ...
 
 class TablePath:
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 49d5179..3da0b25 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -25,16 +25,20 @@ mod admin;
 mod config;
 mod connection;
 mod error;
+mod lookup;
 mod metadata;
 mod table;
+mod upsert;
 mod utils;
 
 pub use admin::*;
 pub use config::*;
 pub use connection::*;
 pub use error::*;
+pub use lookup::*;
 pub use metadata::*;
 pub use table::*;
+pub use upsert::*;
 pub use utils::*;
 
 static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
@@ -55,6 +59,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<FlussAdmin>()?;
     m.add_class::<FlussTable>()?;
     m.add_class::<AppendWriter>()?;
+    m.add_class::<UpsertWriter>()?;
+    m.add_class::<Lookuper>()?;
     m.add_class::<Schema>()?;
     m.add_class::<LogScanner>()?;
     m.add_class::<LakeSnapshot>()?;
diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs
new file mode 100644
index 0000000..8d91a61
--- /dev/null
+++ b/bindings/python/src/lookup.rs
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::table::{internal_row_to_dict, python_pk_to_generic_row};
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+/// Lookuper for performing primary key lookups on a Fluss table.
+///
+/// The Lookuper caches key encoders and bucketing functions, making
+/// repeated lookups efficient. Create once and reuse for multiple lookups.
+///
+/// # Example:
+///     lookuper = table.new_lookup()
+///     result = await lookuper.lookup({"user_id": 1})
+///     result2 = await lookuper.lookup({"user_id": 2})  # Reuses cached 
encoders
+#[pyclass]
+pub struct Lookuper {
+    inner: Arc<Mutex<fcore::client::Lookuper>>,
+    table_info: Arc<fcore::metadata::TableInfo>,
+}
+
+#[pymethods]
+impl Lookuper {
+    /// Lookup a row by its primary key.
+    ///
+    /// Args:
+    ///     pk: A dict, list, or tuple containing only the primary key values.
+    ///         For dict: keys are PK column names.
+    ///         For list/tuple: values in PK column order.
+    ///
+    /// Returns:
+    ///     A dict containing the row data if found, None otherwise.
+    pub fn lookup<'py>(
+        &self,
+        py: Python<'py>,
+        pk: &Bound<'_, PyAny>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let generic_row = python_pk_to_generic_row(pk, &self.table_info)?;
+        let inner = self.inner.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            // Perform async lookup
+            let result = {
+                let mut lookuper = inner.lock().await;
+                lookuper
+                    .lookup(&generic_row)
+                    .await
+                    .map_err(|e| FlussError::new_err(e.to_string()))?
+            };
+
+            // Extract row data
+            let row_opt = result
+                .get_single_row()
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            // Convert to Python with GIL
+            Python::attach(|py| match row_opt {
+                Some(compacted_row) => internal_row_to_dict(py, 
&compacted_row, &table_info),
+                None => Ok(py.None()),
+            })
+        })
+    }
+
+    fn __repr__(&self) -> String {
+        "Lookuper()".to_string()
+    }
+}
+
+impl Lookuper {
+    /// Create a Lookuper from connection components.
+    ///
+    /// This creates the core Lookuper which caches encoders and bucketing 
functions.
+    pub fn new(
+        connection: &Arc<fcore::client::FlussConnection>,
+        metadata: Arc<fcore::client::Metadata>,
+        table_info: fcore::metadata::TableInfo,
+    ) -> PyResult<Self> {
+        let fluss_table = fcore::client::FlussTable::new(connection, metadata, 
table_info.clone());
+
+        let table_lookup = fluss_table
+            .new_lookup()
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+        let lookuper = table_lookup
+            .create_lookuper()
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+        Ok(Self {
+            inner: Arc::new(Mutex::new(lookuper)),
+            table_info: Arc::new(table_info),
+        })
+    }
+}
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 48f09e7..4554ca1 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -36,6 +36,7 @@ const MICROS_PER_MILLI: i64 = 1_000;
 const MICROS_PER_SECOND: i64 = 1_000_000;
 const MICROS_PER_DAY: i64 = 86_400_000_000;
 const NANOS_PER_MILLI: i64 = 1_000_000;
+const NANOS_PER_MICRO: i64 = 1_000;
 
 /// Represents a Fluss table for data operations
 #[pyclass]
@@ -128,6 +129,70 @@ impl FlussTable {
         self.has_primary_key
     }
 
+    /// Create a new lookuper for primary key lookups.
+    ///
+    /// This is only available for tables with a primary key.
+    pub fn new_lookup(&self, _py: Python) -> PyResult<crate::Lookuper> {
+        if !self.has_primary_key {
+            return Err(FlussError::new_err(
+                "Lookup is only supported for primary key tables",
+            ));
+        }
+
+        crate::Lookuper::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        )
+    }
+
+    /// Create a new upsert writer for the table.
+    ///
+    /// This is only available for tables with a primary key.
+    ///
+    /// Args:
+    ///     columns: Optional list of column names for partial update.
+    ///              Only the specified columns will be updated.
+    ///     column_indices: Optional list of column indices (0-based) for 
partial update.
+    ///                     Alternative to `columns` parameter.
+    #[pyo3(signature = (columns=None, column_indices=None))]
+    pub fn new_upsert(
+        &self,
+        _py: Python,
+        columns: Option<Vec<String>>,
+        column_indices: Option<Vec<usize>>,
+    ) -> PyResult<crate::UpsertWriter> {
+        if !self.has_primary_key {
+            return Err(FlussError::new_err(
+                "Upsert is only supported for primary key tables",
+            ));
+        }
+
+        // Validate that at most one parameter is specified
+        if columns.is_some() && column_indices.is_some() {
+            return Err(FlussError::new_err(
+                "Specify only one of 'columns' or 'column_indices', not both",
+            ));
+        }
+
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let table_upsert = fluss_table
+            .new_upsert()
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+        crate::UpsertWriter::new(
+            table_upsert,
+            self.table_info.clone(),
+            columns,
+            column_indices,
+        )
+    }
+
     fn __repr__(&self) -> String {
         format!(
             "FlussTable(path={}.{})",
@@ -358,7 +423,7 @@ where
 }
 
 /// Convert Python row (dict/list/tuple) to GenericRow based on schema
-fn python_to_generic_row(
+pub fn python_to_generic_row(
     row: &Bound<PyAny>,
     table_info: &fcore::metadata::TableInfo,
 ) -> PyResult<fcore::row::GenericRow<'static>> {
@@ -423,6 +488,115 @@ fn python_to_generic_row(
     Ok(fcore::row::GenericRow { values: datums })
 }
 
+/// Convert Python primary key values (dict/list/tuple) to GenericRow.
+/// Only requires PK columns; non-PK columns are filled with Null.
+/// For dict: keys should be PK column names.
+/// For list/tuple: values should be PK values in PK column order.
+pub fn python_pk_to_generic_row(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    let schema = table_info.get_schema();
+    let row_type = table_info.row_type();
+    let fields = row_type.fields();
+    let pk_indexes = schema.primary_key_indexes();
+    let pk_names: Vec<&str> = schema.primary_key_column_names();
+
+    if pk_indexes.is_empty() {
+        return Err(FlussError::new_err(
+            "Table has no primary key; cannot use PK-only row",
+        ));
+    }
+
+    // Initialize all datums as Null
+    let mut datums: Vec<fcore::row::Datum<'static>> = 
vec![fcore::row::Datum::Null; fields.len()];
+
+    // Extract with user-friendly error message
+    let row_input: RowInput = row.extract().map_err(|_| {
+        let type_name = row
+            .get_type()
+            .name()
+            .map(|n| n.to_string())
+            .unwrap_or_else(|_| "unknown".to_string());
+        FlussError::new_err(format!(
+            "PK row must be a dict, list, or tuple; got {type_name}"
+        ))
+    })?;
+
+    match row_input {
+        RowInput::Dict(dict) => {
+            // Validate keys are PK columns
+            for (k, _) in dict.iter() {
+                let key_str = k.extract::<&str>().map_err(|_| {
+                    let key_type = k
+                        .get_type()
+                        .name()
+                        .map(|n| n.to_string())
+                        .unwrap_or_else(|_| "unknown".to_string());
+                    FlussError::new_err(format!("PK dict keys must be strings; 
got {key_type}"))
+                })?;
+
+                if !pk_names.contains(&key_str) {
+                    return Err(FlussError::new_err(format!(
+                        "Unknown PK field '{}'. Expected PK fields: {}",
+                        key_str,
+                        pk_names.join(", ")
+                    )));
+                }
+            }
+
+            // Extract PK values
+            for (i, pk_idx) in pk_indexes.iter().enumerate() {
+                let pk_name = pk_names[i];
+                let field: &fcore::metadata::DataField = &fields[*pk_idx];
+                let value = dict
+                    .get_item(pk_name)?
+                    .ok_or_else(|| FlussError::new_err(format!("Missing PK 
field: {}", pk_name)))?;
+                datums[*pk_idx] = python_value_to_datum(&value, 
field.data_type())
+                    .map_err(|e| FlussError::new_err(format!("PK field '{}': 
{}", pk_name, e)))?;
+            }
+        }
+
+        RowInput::List(list) => {
+            if list.len() != pk_indexes.len() {
+                return Err(FlussError::new_err(format!(
+                    "PK list must have {} elements (PK columns), got {}",
+                    pk_indexes.len(),
+                    list.len()
+                )));
+            }
+            for (i, pk_idx) in pk_indexes.iter().enumerate() {
+                let field: &fcore::metadata::DataField = &fields[*pk_idx];
+                let value = list.get_item(i)?;
+                datums[*pk_idx] =
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!("PK field '{}': {}", 
field.name(), e))
+                    })?;
+            }
+        }
+
+        RowInput::Tuple(tuple) => {
+            if tuple.len() != pk_indexes.len() {
+                return Err(FlussError::new_err(format!(
+                    "PK tuple must have {} elements (PK columns), got {}",
+                    pk_indexes.len(),
+                    tuple.len()
+                )));
+            }
+            for (i, pk_idx) in pk_indexes.iter().enumerate() {
+                let field: &fcore::metadata::DataField = &fields[*pk_idx];
+                let value = tuple.get_item(i)?;
+                datums[*pk_idx] =
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!("PK field '{}': {}", 
field.name(), e))
+                    })?;
+            }
+        }
+    }
+
+    Ok(fcore::row::GenericRow { values: datums })
+}
+
 /// Convert Python value to Datum based on data type
 fn python_value_to_datum(
     value: &Bound<PyAny>,
@@ -516,11 +690,237 @@ fn python_value_to_datum(
     }
 }
 
+/// Convert Rust Datum to Python value based on data type.
+/// This is the reverse of python_value_to_datum.
+pub fn datum_to_python_value(
+    py: Python,
+    row: &dyn fcore::row::InternalRow,
+    pos: usize,
+    data_type: &fcore::metadata::DataType,
+) -> PyResult<Py<PyAny>> {
+    use fcore::metadata::DataType;
+
+    // Check for null first
+    if row.is_null_at(pos) {
+        return Ok(py.None());
+    }
+
+    match data_type {
+        DataType::Boolean(_) => Ok(row
+            .get_boolean(pos)
+            .into_pyobject(py)?
+            .to_owned()
+            .into_any()
+            .unbind()),
+        DataType::TinyInt(_) => Ok(row
+            .get_byte(pos)
+            .into_pyobject(py)?
+            .to_owned()
+            .into_any()
+            .unbind()),
+        DataType::SmallInt(_) => Ok(row
+            .get_short(pos)
+            .into_pyobject(py)?
+            .to_owned()
+            .into_any()
+            .unbind()),
+        DataType::Int(_) => Ok(row
+            .get_int(pos)
+            .into_pyobject(py)?
+            .to_owned()
+            .into_any()
+            .unbind()),
+        DataType::BigInt(_) => Ok(row
+            .get_long(pos)
+            .into_pyobject(py)?
+            .to_owned()
+            .into_any()
+            .unbind()),
+        DataType::Float(_) => Ok(row
+            .get_float(pos)
+            .into_pyobject(py)?
+            .to_owned()
+            .into_any()
+            .unbind()),
+        DataType::Double(_) => Ok(row
+            .get_double(pos)
+            .into_pyobject(py)?
+            .to_owned()
+            .into_any()
+            .unbind()),
+        DataType::String(_) => {
+            let s = row.get_string(pos);
+            Ok(s.into_pyobject(py)?.into_any().unbind())
+        }
+        DataType::Char(char_type) => {
+            let s = row.get_char(pos, char_type.length() as usize);
+            Ok(s.into_pyobject(py)?.into_any().unbind())
+        }
+        DataType::Bytes(_) => {
+            let b = row.get_bytes(pos);
+            Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+        }
+        DataType::Binary(binary_type) => {
+            let b = row.get_binary(pos, binary_type.length());
+            Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+        }
+        DataType::Decimal(decimal_type) => {
+            let decimal = row.get_decimal(
+                pos,
+                decimal_type.precision() as usize,
+                decimal_type.scale() as usize,
+            );
+            rust_decimal_to_python(py, &decimal)
+        }
+        DataType::Date(_) => {
+            let date = row.get_date(pos);
+            rust_date_to_python(py, date)
+        }
+        DataType::Time(_) => {
+            let time = row.get_time(pos);
+            rust_time_to_python(py, time)
+        }
+        DataType::Timestamp(ts_type) => {
+            let ts = row.get_timestamp_ntz(pos, ts_type.precision());
+            rust_timestamp_ntz_to_python(py, ts)
+        }
+        DataType::TimestampLTz(ts_type) => {
+            let ts = row.get_timestamp_ltz(pos, ts_type.precision());
+            rust_timestamp_ltz_to_python(py, ts)
+        }
+        _ => Err(FlussError::new_err(format!(
+            "Unsupported data type for conversion to Python: {data_type}"
+        ))),
+    }
+}
+
+/// Convert Rust Decimal to Python decimal.Decimal
+fn rust_decimal_to_python(py: Python, decimal: &fcore::row::Decimal) -> 
PyResult<Py<PyAny>> {
+    let decimal_ty = get_decimal_type(py)?;
+    let decimal_str = decimal.to_string();
+    let py_decimal = decimal_ty.call1((decimal_str,))?;
+    Ok(py_decimal.into_any().unbind())
+}
+
+/// Convert Rust Date (days since epoch) to Python datetime.date
+fn rust_date_to_python(py: Python, date: fcore::row::Date) -> 
PyResult<Py<PyAny>> {
+    use pyo3::types::PyDate;
+
+    let days_since_epoch = date.get_inner();
+    let epoch = jiff::civil::date(1970, 1, 1);
+    let civil_date = epoch + jiff::Span::new().days(days_since_epoch as i64);
+
+    let py_date = PyDate::new(
+        py,
+        civil_date.year() as i32,
+        civil_date.month() as u8,
+        civil_date.day() as u8,
+    )?;
+    Ok(py_date.into_any().unbind())
+}
+
+/// Convert Rust Time (millis since midnight) to Python datetime.time
+fn rust_time_to_python(py: Python, time: fcore::row::Time) -> 
PyResult<Py<PyAny>> {
+    use pyo3::types::PyTime;
+
+    let millis = time.get_inner() as i64;
+    let hours = millis / MILLIS_PER_HOUR;
+    let minutes = (millis % MILLIS_PER_HOUR) / MILLIS_PER_MINUTE;
+    let seconds = (millis % MILLIS_PER_MINUTE) / MILLIS_PER_SECOND;
+    let microseconds = (millis % MILLIS_PER_SECOND) * MICROS_PER_MILLI;
+
+    let py_time = PyTime::new(
+        py,
+        hours as u8,
+        minutes as u8,
+        seconds as u8,
+        microseconds as u32,
+        None,
+    )?;
+    Ok(py_time.into_any().unbind())
+}
+
+/// Convert Rust TimestampNtz to Python naive datetime
+fn rust_timestamp_ntz_to_python(py: Python, ts: fcore::row::TimestampNtz) -> 
PyResult<Py<PyAny>> {
+    use pyo3::types::PyDateTime;
+
+    let millis = ts.get_millisecond();
+    let nanos = ts.get_nano_of_millisecond();
+    let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / 
NANOS_PER_MICRO);
+
+    // Convert to civil datetime via jiff
+    let timestamp = jiff::Timestamp::from_microsecond(total_micros)
+        .map_err(|e| FlussError::new_err(format!("Invalid timestamp: {e}")))?;
+    let civil_dt = timestamp.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+    let py_dt = PyDateTime::new(
+        py,
+        civil_dt.year() as i32,
+        civil_dt.month() as u8,
+        civil_dt.day() as u8,
+        civil_dt.hour() as u8,
+        civil_dt.minute() as u8,
+        civil_dt.second() as u8,
+        (civil_dt.subsec_nanosecond() / 1000) as u32, // microseconds
+        None,
+    )?;
+    Ok(py_dt.into_any().unbind())
+}
+
+/// Convert Rust TimestampLtz to Python timezone-aware datetime (UTC)
+fn rust_timestamp_ltz_to_python(py: Python, ts: fcore::row::TimestampLtz) -> 
PyResult<Py<PyAny>> {
+    use pyo3::types::PyDateTime;
+
+    let millis = ts.get_epoch_millisecond();
+    let nanos = ts.get_nano_of_millisecond();
+    let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / 
NANOS_PER_MICRO);
+
+    // Convert to civil datetime via jiff
+    let timestamp = jiff::Timestamp::from_microsecond(total_micros)
+        .map_err(|e| FlussError::new_err(format!("Invalid timestamp: {e}")))?;
+    let civil_dt = timestamp.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+    let utc = get_utc_timezone(py)?;
+    let py_dt = PyDateTime::new(
+        py,
+        civil_dt.year() as i32,
+        civil_dt.month() as u8,
+        civil_dt.day() as u8,
+        civil_dt.hour() as u8,
+        civil_dt.minute() as u8,
+        civil_dt.second() as u8,
+        (civil_dt.subsec_nanosecond() / 1000) as u32, // microseconds
+        Some(&utc),
+    )?;
+    Ok(py_dt.into_any().unbind())
+}
+
+/// Convert an InternalRow to a Python dictionary
+pub fn internal_row_to_dict(
+    py: Python,
+    row: &dyn fcore::row::InternalRow,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<Py<PyAny>> {
+    let row_type = table_info.row_type();
+    let fields = row_type.fields();
+    let dict = pyo3::types::PyDict::new(py);
+
+    for (pos, field) in fields.iter().enumerate() {
+        let value = datum_to_python_value(py, row, pos, field.data_type())?;
+        dict.set_item(field.name(), value)?;
+    }
+
+    Ok(dict.into_any().unbind())
+}
+
 /// Cached decimal.Decimal type
 /// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
 static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
     pyo3::sync::PyOnceLock::new();
 
+/// Cached UTC timezone
+static UTC_TIMEZONE: pyo3::sync::PyOnceLock<Py<PyAny>> = 
pyo3::sync::PyOnceLock::new();
+
 /// Cached UTC epoch type
 static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> = 
pyo3::sync::PyOnceLock::new();
 
@@ -536,6 +936,21 @@ fn get_decimal_type(py: Python) -> 
PyResult<Bound<pyo3::types::PyType>> {
     Ok(ty.bind(py).clone())
 }
 
+/// Get the cached UTC timezone (datetime.timezone.utc), creating it once per 
interpreter.
+fn get_utc_timezone(py: Python) -> PyResult<Bound<pyo3::types::PyTzInfo>> {
+    let tz = UTC_TIMEZONE.get_or_try_init(py, || -> PyResult<_> {
+        let datetime_mod = py.import("datetime")?;
+        let timezone = datetime_mod.getattr("timezone")?;
+        let utc = timezone.getattr("utc")?;
+        Ok(utc.unbind())
+    })?;
+    // Downcast to PyTzInfo for use with PyDateTime::new()
+    Ok(tz
+        .bind(py)
+        .clone()
+        .downcast_into::<pyo3::types::PyTzInfo>()?)
+}
+
 /// Get the cached UTC epoch datetime, creating it once per interpreter.
 fn get_utc_epoch(py: Python) -> PyResult<Bound<PyAny>> {
     let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
new file mode 100644
index 0000000..08b3597
--- /dev/null
+++ b/bindings/python/src/upsert.rs
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::table::{python_pk_to_generic_row, python_to_generic_row};
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+/// Writer for upserting and deleting data in a Fluss primary key table.
+///
+/// Each upsert/delete operation is sent to the server and waits for 
acknowledgment.
+/// Multiple concurrent writers share a common WriterClient which batches 
requests
+/// for efficiency.
+///
+/// # Example:
+///     writer = table.new_upsert()
+///     await writer.upsert(row1)
+///     await writer.upsert(row2)
+///     await writer.delete(pk)
+///     await writer.flush()  # Ensures all pending operations are acknowledged
+#[pyclass]
+pub struct UpsertWriter {
+    inner: Arc<UpsertWriterInner>,
+}
+
+struct UpsertWriterInner {
+    table_upsert: fcore::client::TableUpsert,
+    /// Lazily initialized writer - created on first write operation
+    writer: Mutex<Option<fcore::client::UpsertWriter>>,
+    table_info: fcore::metadata::TableInfo,
+}
+
+#[pymethods]
+impl UpsertWriter {
+    /// Upsert a row into the table.
+    ///
+    /// If a row with the same primary key exists, it will be updated.
+    /// Otherwise, a new row will be inserted.
+    ///
+    /// Args:
+    ///     row: A dict, list, or tuple containing the row data.
+    ///          For dict: keys are column names, values are column values.
+    ///          For list/tuple: values must be in schema order.
+    ///
+    /// Returns:
+    ///     None on success
+    pub fn upsert<'py>(
+        &self,
+        py: Python<'py>,
+        row: &Bound<'_, PyAny>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let generic_row = python_to_generic_row(row, &self.inner.table_info)?;
+        let inner = self.inner.clone();
+
+        future_into_py(py, async move {
+            let mut guard = inner.get_or_create_writer().await?;
+            let writer = guard.as_mut().unwrap();
+            writer
+                .upsert(&generic_row)
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+            Ok(())
+        })
+    }
+
+    /// Delete a row from the table by primary key.
+    ///
+    /// Args:
+    ///     pk: A dict, list, or tuple containing only the primary key values.
+    ///         For dict: keys are PK column names.
+    ///         For list/tuple: values in PK column order.
+    ///
+    /// Returns:
+    ///     None on success
+    pub fn delete<'py>(
+        &self,
+        py: Python<'py>,
+        pk: &Bound<'_, PyAny>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let generic_row = python_pk_to_generic_row(pk, 
&self.inner.table_info)?;
+        let inner = self.inner.clone();
+
+        future_into_py(py, async move {
+            let mut guard = inner.get_or_create_writer().await?;
+            let writer = guard.as_mut().unwrap();
+            writer
+                .delete(&generic_row)
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+            Ok(())
+        })
+    }
+
+    /// Flush all pending upsert/delete operations to the server.
+    ///
+    /// This method sends all buffered operations and blocks until they are
+    /// acknowledged according to the writer's ack configuration.
+    ///
+    /// Returns:
+    ///     None on success
+    pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+        let inner = self.inner.clone();
+
+        future_into_py(py, async move {
+            let writer_guard = inner.writer.lock().await;
+
+            if let Some(writer) = writer_guard.as_ref() {
+                writer
+                    .flush()
+                    .await
+                    .map_err(|e| FlussError::new_err(e.to_string()))
+            } else {
+                // Nothing to flush - no writer was created yet
+                Ok(())
+            }
+        })
+    }
+
+    fn __repr__(&self) -> String {
+        "UpsertWriter()".to_string()
+    }
+}
+
+impl UpsertWriter {
+    /// Create an UpsertWriter from a TableUpsert.
+    ///
+    /// Optionally supports partial updates via column names or indices.
+    pub fn new(
+        table_upsert: fcore::client::TableUpsert,
+        table_info: fcore::metadata::TableInfo,
+        columns: Option<Vec<String>>,
+        column_indices: Option<Vec<usize>>,
+    ) -> PyResult<Self> {
+        // Apply partial update configuration if specified
+        let table_upsert = if let Some(cols) = columns {
+            let col_refs: Vec<&str> = cols.iter().map(|s| 
s.as_str()).collect();
+            table_upsert
+                .partial_update_with_column_names(&col_refs)
+                .map_err(|e| FlussError::new_err(e.to_string()))?
+        } else if let Some(indices) = column_indices {
+            table_upsert
+                .partial_update(Some(indices))
+                .map_err(|e| FlussError::new_err(e.to_string()))?
+        } else {
+            table_upsert
+        };
+
+        Ok(Self {
+            inner: Arc::new(UpsertWriterInner {
+                table_upsert,
+                writer: Mutex::new(None),
+                table_info,
+            }),
+        })
+    }
+}
+
+impl UpsertWriterInner {
+    /// Get the cached writer or create one on first use.
+    async fn get_or_create_writer(
+        &self,
+    ) -> PyResult<tokio::sync::MutexGuard<'_, 
Option<fcore::client::UpsertWriter>>> {
+        let mut guard = self.writer.lock().await;
+        if guard.is_none() {
+            let writer = self
+                .table_upsert
+                .create_writer()
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+            *guard = Some(writer);
+        }
+        Ok(guard)
+    }
+}
diff --git a/crates/fluss/src/client/table/lookup.rs 
b/crates/fluss/src/client/table/lookup.rs
index 69cb91e..5410002 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::bucketing::BucketingFunction;
-use crate::client::connection::FlussConnection;
 use crate::client::metadata::Metadata;
 use crate::client::table::partition_getter::PartitionGetter;
 use crate::error::{Error, Result};
@@ -26,6 +25,7 @@ use crate::row::InternalRow;
 use crate::row::compacted::CompactedRow;
 use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
 use crate::rpc::ApiError;
+use crate::rpc::RpcClient;
 use crate::rpc::message::LookupRequest;
 use std::sync::Arc;
 
@@ -34,19 +34,19 @@ use std::sync::Arc;
 /// Contains the rows returned from a lookup. For primary key lookups,
 /// this will contain at most one row. For prefix key lookups (future),
 /// this may contain multiple rows.
-pub struct LookupResult<'a> {
+pub struct LookupResult {
     rows: Vec<Vec<u8>>,
-    row_type: &'a RowType,
+    row_type: Arc<RowType>,
 }
 
-impl<'a> LookupResult<'a> {
+impl LookupResult {
     /// Creates a new LookupResult from a list of row bytes.
-    fn new(rows: Vec<Vec<u8>>, row_type: &'a RowType) -> Self {
+    fn new(rows: Vec<Vec<u8>>, row_type: Arc<RowType>) -> Self {
         Self { rows, row_type }
     }
 
     /// Creates an empty LookupResult.
-    fn empty(row_type: &'a RowType) -> Self {
+    fn empty(row_type: Arc<RowType>) -> Self {
         Self {
             rows: Vec::new(),
             row_type,
@@ -67,7 +67,7 @@ impl<'a> LookupResult<'a> {
         match self.rows.len() {
             0 => Ok(None),
             1 => Ok(Some(CompactedRow::from_bytes(
-                self.row_type,
+                &self.row_type,
                 &self.rows[0][SCHEMA_ID_LENGTH..],
             ))),
             _ => Err(Error::UnexpectedError {
@@ -82,7 +82,7 @@ impl<'a> LookupResult<'a> {
         self.rows
             .iter()
             // TODO Add schema id check and fetch when implementing prefix 
lookup
-            .map(|bytes| CompactedRow::from_bytes(self.row_type, 
&bytes[SCHEMA_ID_LENGTH..]))
+            .map(|bytes| CompactedRow::from_bytes(&self.row_type, 
&bytes[SCHEMA_ID_LENGTH..]))
             .collect()
     }
 }
@@ -104,20 +104,20 @@ impl<'a> LookupResult<'a> {
 /// ```
 // TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper)
 // TODO: Add create_typed_lookuper<T>() for typed lookups with POJO mapping
-pub struct TableLookup<'a> {
-    conn: &'a FlussConnection,
+pub struct TableLookup {
+    rpc_client: Arc<RpcClient>,
     table_info: TableInfo,
     metadata: Arc<Metadata>,
 }
 
-impl<'a> TableLookup<'a> {
+impl TableLookup {
     pub(super) fn new(
-        conn: &'a FlussConnection,
+        rpc_client: Arc<RpcClient>,
         table_info: TableInfo,
         metadata: Arc<Metadata>,
     ) -> Self {
         Self {
-            conn,
+            rpc_client,
             table_info,
             metadata,
         }
@@ -127,7 +127,7 @@ impl<'a> TableLookup<'a> {
     ///
     /// The lookuper will automatically encode the key and compute the bucket
     /// for each lookup using the appropriate bucketing function.
-    pub fn create_lookuper(self) -> Result<Lookuper<'a>> {
+    pub fn create_lookuper(self) -> Result<Lookuper> {
         let num_buckets = self.table_info.get_num_buckets();
 
         // Get data lake format from table config for bucketing function
@@ -162,9 +162,11 @@ impl<'a> TableLookup<'a> {
             None
         };
 
+        let row_type = Arc::new(self.table_info.row_type().clone());
         Ok(Lookuper {
-            conn: self.conn,
+            rpc_client: self.rpc_client,
             table_path: Arc::new(self.table_info.table_path.clone()),
+            row_type,
             table_info: self.table_info,
             metadata: self.metadata,
             bucketing_function,
@@ -187,9 +189,10 @@ impl<'a> TableLookup<'a> {
 /// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key
 /// let result = lookuper.lookup(&row).await?;
 /// ```
-pub struct Lookuper<'a> {
-    conn: &'a FlussConnection,
+pub struct Lookuper {
+    rpc_client: Arc<RpcClient>,
     table_info: TableInfo,
+    row_type: Arc<RowType>,
     table_path: Arc<TablePath>,
     metadata: Arc<Metadata>,
     bucketing_function: Box<dyn BucketingFunction>,
@@ -199,7 +202,7 @@ pub struct Lookuper<'a> {
     num_buckets: i32,
 }
 
-impl<'a> Lookuper<'a> {
+impl Lookuper {
     /// Looks up a value by its primary key.
     ///
     /// The key is encoded and the bucket is automatically computed using
@@ -211,7 +214,7 @@ impl<'a> Lookuper<'a> {
     /// # Returns
     /// * `Ok(LookupResult)` - The lookup result (may be empty if key not 
found)
     /// * `Err(Error)` - If the lookup fails
-    pub async fn lookup(&mut self, row: &dyn InternalRow) -> 
Result<LookupResult<'_>> {
+    pub async fn lookup(&mut self, row: &dyn InternalRow) -> 
Result<LookupResult> {
         // todo: support batch lookup
         let pk_bytes = self.primary_key_encoder.encode_key(row)?;
         let pk_bytes_vec = pk_bytes.to_vec();
@@ -231,7 +234,7 @@ impl<'a> Lookuper<'a> {
                 Some(id) => Some(id),
                 None => {
                     // Partition doesn't exist, return empty result (like Java)
-                    return Ok(LookupResult::empty(self.table_info.row_type()));
+                    return Ok(LookupResult::empty(Arc::clone(&self.row_type)));
                 }
             }
         } else {
@@ -266,8 +269,7 @@ impl<'a> Lookuper<'a> {
                     ),
                 })?;
 
-        let connections = self.conn.get_connections();
-        let connection = connections.get_connection(tablet_server).await?;
+        let connection = self.rpc_client.get_connection(tablet_server).await?;
 
         // Send lookup request
         let request = LookupRequest::new(table_id, partition_id, bucket_id, 
vec![pk_bytes_vec]);
@@ -294,10 +296,10 @@ impl<'a> Lookuper<'a> {
                 .filter_map(|pb_value| pb_value.values)
                 .collect();
 
-            return Ok(LookupResult::new(rows, self.table_info.row_type()));
+            return Ok(LookupResult::new(rows, Arc::clone(&self.row_type)));
         }
 
-        Ok(LookupResult::empty(self.table_info.row_type()))
+        Ok(LookupResult::empty(Arc::clone(&self.row_type)))
     }
 
     /// Returns a reference to the table info.
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 6d54933..37e9b45 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -112,14 +112,14 @@ impl<'a> FlussTable<'a> {
     ///     println!("Found value: {:?}", value);
     /// }
     /// ```
-    pub fn new_lookup(&self) -> Result<TableLookup<'_>> {
+    pub fn new_lookup(&self) -> Result<TableLookup> {
         if !self.has_primary_key {
             return Err(Error::UnsupportedOperation {
                 message: "Lookup is only supported for primary key 
tables".to_string(),
             });
         }
         Ok(TableLookup::new(
-            self.conn,
+            self.conn.get_connections(),
             self.table_info.clone(),
             self.metadata.clone(),
         ))
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 422f9d3..aa9fca4 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -97,7 +97,7 @@ impl<'a> TableScan<'a> {
     ///                 .column("col1", DataTypes::int())
     ///                 .column("col2", DataTypes::string())
     ///                 .column("col3", DataTypes::string())
-    ///                 .column("col3", DataTypes::string())
+    ///                 .column("col4", DataTypes::string())
     ///             .build()?,
     ///         ).build()?;
     ///     let table_path = TablePath::new("fluss".to_owned(), 
"rust_test_long".to_owned());
@@ -179,7 +179,6 @@ impl<'a> TableScan<'a> {
     ///     let admin = conn.get_admin().await?;
     ///     admin.create_table(&table_path, &table_descriptor, true)
     ///         .await?;
-    ///     let table_info = admin.get_table(&table_path).await?;
     ///     let table = conn.get_table(&table_path).await?;
     ///
     ///     // Project columns by column names
diff --git a/crates/fluss/src/row/encode/mod.rs 
b/crates/fluss/src/row/encode/mod.rs
index d5cf8ac..1ce7aef 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -27,7 +27,7 @@ use bytes::Bytes;
 
 /// An interface for encoding key of row into bytes.
 #[allow(dead_code)]
-pub trait KeyEncoder {
+pub trait KeyEncoder: Send + Sync {
     fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
 }
 
@@ -71,7 +71,7 @@ impl KeyEncoderFactory {
 /// 2. call method [`RowEncoder::encode_field()`] to write the row's field.
 /// 3. call method [`RowEncoder::finishRow()`] to finish the writing and get 
the written row.
 #[allow(dead_code)]
-pub trait RowEncoder {
+pub trait RowEncoder: Send + Sync {
     /// Start to write a new row.
     ///
     /// # Returns

Reply via email to