This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 31d5df5 feat: introduce kv tables support in python (#239)
31d5df5 is described below
commit 31d5df5d6d62cf5a7b208e8c8a03bee9730e5826
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 4 15:15:26 2026 +0000
feat: introduce kv tables support in python (#239)
---
bindings/python/example/example.py | 305 +++++++++++++++++++---
bindings/python/fluss/__init__.pyi | 81 +++++-
bindings/python/src/lib.rs | 6 +
bindings/python/src/lookup.rs | 111 ++++++++
bindings/python/src/table.rs | 417 ++++++++++++++++++++++++++++++-
bindings/python/src/upsert.rs | 188 ++++++++++++++
crates/fluss/src/client/table/lookup.rs | 50 ++--
crates/fluss/src/client/table/mod.rs | 4 +-
crates/fluss/src/client/table/scanner.rs | 3 +-
crates/fluss/src/row/encode/mod.rs | 4 +-
10 files changed, 1101 insertions(+), 68 deletions(-)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 5d0302e..c359425 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -16,8 +16,9 @@
# under the License.
import asyncio
-import time
-from datetime import date, time as dt_time, datetime
+import traceback
+from datetime import date, datetime
+from datetime import time as dt_time
from decimal import Decimal
import pandas as pd
@@ -103,11 +104,34 @@ async def main():
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
pa.array([95.2, 87.2, 92.1], type=pa.float32()),
pa.array([25, 30, 35], type=pa.int32()),
- pa.array([date(1999, 5, 15), date(1994, 3, 20), date(1989, 11,
8)], type=pa.date32()),
- pa.array([dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0,
0)], type=pa.time32("ms")),
- pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15,
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us")),
- pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15,
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us", tz="UTC")),
- pa.array([Decimal("75000.00"), Decimal("82000.50"),
Decimal("95000.75")], type=pa.decimal128(10, 2)),
+ pa.array(
+ [date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)],
+ type=pa.date32(),
+ ),
+ pa.array(
+ [dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)],
+ type=pa.time32("ms"),
+ ),
+ pa.array(
+ [
+ datetime(2024, 1, 15, 10, 30),
+ datetime(2024, 1, 15, 11, 0),
+ datetime(2024, 1, 15, 11, 30),
+ ],
+ type=pa.timestamp("us"),
+ ),
+ pa.array(
+ [
+ datetime(2024, 1, 15, 10, 30),
+ datetime(2024, 1, 15, 11, 0),
+ datetime(2024, 1, 15, 11, 30),
+ ],
+ type=pa.timestamp("us", tz="UTC"),
+ ),
+ pa.array(
+ [Decimal("75000.00"), Decimal("82000.50"),
Decimal("95000.75")],
+ type=pa.decimal128(10, 2),
+ ),
],
schema=schema,
)
@@ -125,9 +149,18 @@ async def main():
pa.array([28, 32], type=pa.int32()),
pa.array([date(1996, 7, 22), date(1992, 12, 1)],
type=pa.date32()),
pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)],
type=pa.time32("ms")),
- pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16,
9, 30)], type=pa.timestamp("us")),
- pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16,
9, 30)], type=pa.timestamp("us", tz="UTC")),
- pa.array([Decimal("68000.00"), Decimal("72500.25")],
type=pa.decimal128(10, 2)),
+ pa.array(
+ [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9,
30)],
+ type=pa.timestamp("us"),
+ ),
+ pa.array(
+ [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9,
30)],
+ type=pa.timestamp("us", tz="UTC"),
+ ),
+ pa.array(
+ [Decimal("68000.00"), Decimal("72500.25")],
+ type=pa.decimal128(10, 2),
+ ),
],
schema=schema,
)
@@ -138,28 +171,35 @@ async def main():
# Test 3: Append single rows with Date, Time, Timestamp, Decimal
print("\n--- Testing single row append with temporal/decimal types
---")
# Dict input with all types including Date, Time, Timestamp, Decimal
- await append_writer.append({
- "id": 8,
- "name": "Helen",
- "score": 93.5,
- "age": 26,
- "birth_date": date(1998, 4, 10),
- "check_in_time": dt_time(11, 30, 45),
- "created_at": datetime(2024, 1, 17, 14, 0, 0),
- "updated_at": datetime(2024, 1, 17, 14, 0, 0),
- "salary": Decimal("88000.00"),
- })
+ await append_writer.append(
+ {
+ "id": 8,
+ "name": "Helen",
+ "score": 93.5,
+ "age": 26,
+ "birth_date": date(1998, 4, 10),
+ "check_in_time": dt_time(11, 30, 45),
+ "created_at": datetime(2024, 1, 17, 14, 0, 0),
+ "updated_at": datetime(2024, 1, 17, 14, 0, 0),
+ "salary": Decimal("88000.00"),
+ }
+ )
print("Successfully appended row (dict with Date, Time, Timestamp,
Decimal)")
# List input with all types
- await append_writer.append([
- 9, "Ivan", 90.0, 31,
- date(1993, 8, 25),
- dt_time(16, 45, 0),
- datetime(2024, 1, 17, 15, 30, 0),
- datetime(2024, 1, 17, 15, 30, 0),
- Decimal("91500.50"),
- ])
+ await append_writer.append(
+ [
+ 9,
+ "Ivan",
+ 90.0,
+ 31,
+ date(1993, 8, 25),
+ dt_time(16, 45, 0),
+ datetime(2024, 1, 17, 15, 30, 0),
+ datetime(2024, 1, 17, 15, 30, 0),
+ Decimal("91500.50"),
+ ]
+ )
print("Successfully appended row (list with Date, Time, Timestamp,
Decimal)")
# Test 4: Write Pandas DataFrame
@@ -172,8 +212,14 @@ async def main():
"age": [29, 27],
"birth_date": [date(1995, 2, 14), date(1997, 9, 30)],
"check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)],
- "created_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1,
18, 8, 30)],
- "updated_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1,
18, 8, 30)],
+ "created_at": [
+ datetime(2024, 1, 18, 8, 0),
+ datetime(2024, 1, 18, 8, 30),
+ ],
+ "updated_at": [
+ datetime(2024, 1, 18, 8, 0),
+ datetime(2024, 1, 18, 8, 30),
+ ],
"salary": [Decimal("79000.00"), Decimal("85500.75")],
}
)
@@ -249,6 +295,199 @@ async def main():
except Exception as e:
print(f"Error during scanning: {e}")
+ # =====================================================
+ # Demo: Primary Key Table with Lookup and Upsert
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Primary Key Table (Lookup & Upsert) ---")
+ print("=" * 60)
+
+ # Create a primary key table for lookup/upsert tests
+ # Include temporal and decimal types to test full conversion
+ pk_table_fields = [
+ pa.field("user_id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("email", pa.string()),
+ pa.field("age", pa.int32()),
+ pa.field("birth_date", pa.date32()),
+ pa.field("login_time", pa.time32("ms")),
+ pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ)
+ pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ
+ pa.field("balance", pa.decimal128(10, 2)),
+ ]
+ pk_schema = pa.schema(pk_table_fields)
+ fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"])
+
+ # Create table descriptor
+ pk_table_descriptor = fluss.TableDescriptor(
+ fluss_pk_schema,
+ bucket_count=3,
+ )
+
+ pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3")
+
+ try:
+ await admin.create_table(pk_table_path, pk_table_descriptor, True)
+ print(f"Created PK table: {pk_table_path}")
+ except Exception as e:
+ print(f"PK Table creation failed (may already exist): {e}")
+
+ # Get the PK table
+ pk_table = await conn.get_table(pk_table_path)
+ print(f"Got PK table: {pk_table}")
+ print(f"Has primary key: {pk_table.has_primary_key()}")
+
+ # --- Test Upsert ---
+ print("\n--- Testing Upsert ---")
+ try:
+ upsert_writer = pk_table.new_upsert()
+ print(f"Created upsert writer: {upsert_writer}")
+
+ await upsert_writer.upsert(
+ {
+ "user_id": 1,
+ "name": "Alice",
+ "email": "[email protected]",
+ "age": 25,
+ "birth_date": date(1999, 5, 15),
+ "login_time": dt_time(9, 30, 45, 123000), # 09:30:45.123
+ "created_at": datetime(
+ 2024, 1, 15, 10, 30, 45, 123456
+ ), # with microseconds
+ "updated_at": datetime(2024, 1, 15, 10, 30, 45, 123456),
+ "balance": Decimal("1234.56"),
+ }
+ )
+ print("Upserted user_id=1 (Alice)")
+
+ await upsert_writer.upsert(
+ {
+ "user_id": 2,
+ "name": "Bob",
+ "email": "[email protected]",
+ "age": 30,
+ "birth_date": date(1994, 3, 20),
+ "login_time": dt_time(14, 15, 30, 500000), # 14:15:30.500
+ "created_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
+ "updated_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
+ "balance": Decimal("5678.91"),
+ }
+ )
+ print("Upserted user_id=2 (Bob)")
+
+ await upsert_writer.upsert(
+ {
+ "user_id": 3,
+ "name": "Charlie",
+ "email": "[email protected]",
+ "age": 35,
+ "birth_date": date(1989, 11, 8),
+ "login_time": dt_time(16, 45, 59, 999000), # 16:45:59.999
+ "created_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
+ "updated_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
+ "balance": Decimal("9876.54"),
+ }
+ )
+ print("Upserted user_id=3 (Charlie)")
+
+ # Update an existing row (same PK, different values)
+ await upsert_writer.upsert(
+ {
+ "user_id": 1,
+ "name": "Alice Updated",
+ "email": "[email protected]",
+ "age": 26,
+ "birth_date": date(1999, 5, 15),
+ "login_time": dt_time(10, 11, 12, 345000), # 10:11:12.345
+ "created_at": datetime(2024, 1, 15, 10, 30, 45, 123456), #
unchanged
+ "updated_at": datetime(
+ 2024, 1, 20, 15, 45, 30, 678901
+ ), # new update time
+ "balance": Decimal("2345.67"),
+ }
+ )
+ print("Updated user_id=1 (Alice -> Alice Updated)")
+
+ # Explicit flush to ensure all upserts are acknowledged
+ await upsert_writer.flush()
+ print("Flushed all upserts")
+
+ except Exception as e:
+ print(f"Error during upsert: {e}")
+ traceback.print_exc()
+
+ # --- Test Lookup ---
+ print("\n--- Testing Lookup ---")
+ try:
+ lookuper = pk_table.new_lookup()
+ print(f"Created lookuper: {lookuper}")
+
+ result = await lookuper.lookup({"user_id": 1})
+ if result:
+ print("Lookup user_id=1: Found!")
+ print(f" name: {result['name']}")
+ print(f" email: {result['email']}")
+ print(f" age: {result['age']}")
+ print(
+ f" birth_date: {result['birth_date']} (type:
{type(result['birth_date']).__name__})"
+ )
+ print(
+ f" login_time: {result['login_time']} (type:
{type(result['login_time']).__name__})"
+ )
+ print(
+ f" created_at: {result['created_at']} (type:
{type(result['created_at']).__name__})"
+ )
+ print(
+ f" updated_at: {result['updated_at']} (type:
{type(result['updated_at']).__name__})"
+ )
+ print(
+ f" balance: {result['balance']} (type:
{type(result['balance']).__name__})"
+ )
+ else:
+ print("Lookup user_id=1: Not found")
+
+ # Lookup another row
+ result = await lookuper.lookup({"user_id": 2})
+ if result:
+ print(f"Lookup user_id=2: Found! -> {result}")
+ else:
+ print("Lookup user_id=2: Not found")
+
+ # Lookup non-existent row
+ result = await lookuper.lookup({"user_id": 999})
+ if result:
+ print(f"Lookup user_id=999: Found! -> {result}")
+ else:
+ print("Lookup user_id=999: Not found (as expected)")
+
+ except Exception as e:
+ print(f"Error during lookup: {e}")
+ traceback.print_exc()
+
+ # --- Test Delete ---
+ print("\n--- Testing Delete ---")
+ try:
+ upsert_writer = pk_table.new_upsert()
+
+ # Delete only needs PK columns - much simpler API!
+ await upsert_writer.delete({"user_id": 3})
+ print("Deleted user_id=3")
+
+ # Explicit flush to ensure delete is acknowledged
+ await upsert_writer.flush()
+ print("Flushed delete")
+
+ lookuper = pk_table.new_lookup()
+ result = await lookuper.lookup({"user_id": 3})
+ if result:
+ print(f"Lookup user_id=3 after delete: Still found! -> {result}")
+ else:
+ print("Lookup user_id=3 after delete: Not found (deletion
confirmed)")
+
+ except Exception as e:
+ print(f"Error during delete: {e}")
+ traceback.print_exc()
+
# Demo: Column projection
print("\n--- Testing Column Projection ---")
try:
@@ -258,7 +497,9 @@ async def main():
scanner_index.subscribe(None, None)
df_projected = scanner_index.to_pandas()
print(df_projected.head())
- print(f" Projected {df_projected.shape[1]} columns:
{list(df_projected.columns)}")
+ print(
+ f" Projected {df_projected.shape[1]} columns:
{list(df_projected.columns)}"
+ )
# Project specific columns by name (Pythonic!)
print("\n2. Projection by name ['name', 'score'] (Pythonic):")
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 6073070..c911ebe 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -45,7 +45,12 @@ class FlussConnection:
async def get_table(self, table_path: TablePath) -> FlussTable: ...
def close(self) -> None: ...
def __enter__(self) -> FlussConnection: ...
- def __exit__(self, exc_type: Optional[type], exc_value:
Optional[BaseException], traceback: Optional[TracebackType]) -> bool: ...
+ def __exit__(
+ self,
+ exc_type: Optional[type],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> bool: ...
def __repr__(self) -> str: ...
class FlussAdmin:
@@ -61,7 +66,17 @@ class FlussAdmin:
class FlussTable:
async def new_append_writer(self) -> AppendWriter: ...
- async def new_log_scanner(self) -> LogScanner: ...
+ async def new_log_scanner(
+ self,
+ project: Optional[List[int]] = None,
+ columns: Optional[List[str]] = None,
+ ) -> LogScanner: ...
+ def new_upsert(
+ self,
+ columns: Optional[List[str]] = None,
+ column_indices: Optional[List[int]] = None,
+ ) -> UpsertWriter: ...
+ def new_lookup(self) -> Lookuper: ...
def get_table_info(self) -> TableInfo: ...
def get_table_path(self) -> TablePath: ...
def has_primary_key(self) -> bool: ...
@@ -100,6 +115,49 @@ class AppendWriter:
def flush(self) -> None: ...
def __repr__(self) -> str: ...
+class UpsertWriter:
+ """Writer for upserting and deleting data in a Fluss primary key table."""
+
+ async def upsert(self, row: dict | list | tuple) -> None:
+ """Upsert a row into the table.
+
+ If a row with the same primary key exists, it will be updated.
+ Otherwise, a new row will be inserted.
+
+ Args:
+ row: Dictionary mapping field names to values, or
+ list/tuple of values in schema order
+ """
+ ...
+ async def delete(self, pk: dict | list | tuple) -> None:
+ """Delete a row from the table by primary key.
+
+ Args:
+ pk: Dictionary with PK column names as keys, or
+ list/tuple of PK values in PK column order
+ """
+ ...
+ async def flush(self) -> None:
+ """Flush all pending upsert/delete operations to the server."""
+ ...
+ def __repr__(self) -> str: ...
+
+class Lookuper:
+ """Lookuper for performing primary key lookups on a Fluss table."""
+
+ async def lookup(self, pk: dict | list | tuple) -> Optional[Dict[str,
object]]:
+ """Lookup a row by its primary key.
+
+ Args:
+ pk: Dictionary with PK column names as keys, or
+ list/tuple of PK values in PK column order
+
+ Returns:
+ A dict containing the row data if found, None otherwise.
+ """
+ ...
+ def __repr__(self) -> str: ...
+
class LogScanner:
def subscribe(
self, start_timestamp: Optional[int], end_timestamp: Optional[int]
@@ -109,14 +167,27 @@ class LogScanner:
def __repr__(self) -> str: ...
class Schema:
- def __init__(self, schema: pa.Schema, primary_keys: Optional[List[str]] =
None) -> None: ...
+ def __init__(
+ self, schema: pa.Schema, primary_keys: Optional[List[str]] = None
+ ) -> None: ...
def get_column_names(self) -> List[str]: ...
def get_column_types(self) -> List[str]: ...
- def get_columns(self) -> List[Tuple[str,str]]: ...
+ def get_columns(self) -> List[Tuple[str, str]]: ...
def __str__(self) -> str: ...
class TableDescriptor:
- def __init__(self, schema: Schema, **kwargs: str) -> None: ...
+ def __init__(
+ self,
+ schema: Schema,
+ *,
+ partition_keys: Optional[List[str]] = None,
+ bucket_count: Optional[int] = None,
+ bucket_keys: Optional[List[str]] = None,
+ comment: Optional[str] = None,
+ log_format: Optional[str] = None,
+ kv_format: Optional[str] = None,
+ **properties: str,
+ ) -> None: ...
def get_schema(self) -> Schema: ...
class TablePath:
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 49d5179..3da0b25 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -25,16 +25,20 @@ mod admin;
mod config;
mod connection;
mod error;
+mod lookup;
mod metadata;
mod table;
+mod upsert;
mod utils;
pub use admin::*;
pub use config::*;
pub use connection::*;
pub use error::*;
+pub use lookup::*;
pub use metadata::*;
pub use table::*;
+pub use upsert::*;
pub use utils::*;
static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
@@ -55,6 +59,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<FlussAdmin>()?;
m.add_class::<FlussTable>()?;
m.add_class::<AppendWriter>()?;
+ m.add_class::<UpsertWriter>()?;
+ m.add_class::<Lookuper>()?;
m.add_class::<Schema>()?;
m.add_class::<LogScanner>()?;
m.add_class::<LakeSnapshot>()?;
diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs
new file mode 100644
index 0000000..8d91a61
--- /dev/null
+++ b/bindings/python/src/lookup.rs
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::table::{internal_row_to_dict, python_pk_to_generic_row};
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+/// Lookuper for performing primary key lookups on a Fluss table.
+///
+/// The Lookuper caches key encoders and bucketing functions, making
+/// repeated lookups efficient. Create once and reuse for multiple lookups.
+///
+/// # Example:
+/// lookuper = table.new_lookup()
+/// result = await lookuper.lookup({"user_id": 1})
+/// result2 = await lookuper.lookup({"user_id": 2}) # Reuses cached
encoders
+#[pyclass]
+pub struct Lookuper {
+ inner: Arc<Mutex<fcore::client::Lookuper>>,
+ table_info: Arc<fcore::metadata::TableInfo>,
+}
+
+#[pymethods]
+impl Lookuper {
+ /// Lookup a row by its primary key.
+ ///
+ /// Args:
+ /// pk: A dict, list, or tuple containing only the primary key values.
+ /// For dict: keys are PK column names.
+ /// For list/tuple: values in PK column order.
+ ///
+ /// Returns:
+ /// A dict containing the row data if found, None otherwise.
+ pub fn lookup<'py>(
+ &self,
+ py: Python<'py>,
+ pk: &Bound<'_, PyAny>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let generic_row = python_pk_to_generic_row(pk, &self.table_info)?;
+ let inner = self.inner.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ // Perform async lookup
+ let result = {
+ let mut lookuper = inner.lock().await;
+ lookuper
+ .lookup(&generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ };
+
+ // Extract row data
+ let row_opt = result
+ .get_single_row()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ // Convert to Python with GIL
+ Python::attach(|py| match row_opt {
+ Some(compacted_row) => internal_row_to_dict(py,
&compacted_row, &table_info),
+ None => Ok(py.None()),
+ })
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "Lookuper()".to_string()
+ }
+}
+
+impl Lookuper {
+ /// Create a Lookuper from connection components.
+ ///
+ /// This creates the core Lookuper which caches encoders and bucketing
functions.
+ pub fn new(
+ connection: &Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ ) -> PyResult<Self> {
+ let fluss_table = fcore::client::FlussTable::new(connection, metadata,
table_info.clone());
+
+ let table_lookup = fluss_table
+ .new_lookup()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let lookuper = table_lookup
+ .create_lookuper()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ Ok(Self {
+ inner: Arc::new(Mutex::new(lookuper)),
+ table_info: Arc::new(table_info),
+ })
+ }
+}
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 48f09e7..4554ca1 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -36,6 +36,7 @@ const MICROS_PER_MILLI: i64 = 1_000;
const MICROS_PER_SECOND: i64 = 1_000_000;
const MICROS_PER_DAY: i64 = 86_400_000_000;
const NANOS_PER_MILLI: i64 = 1_000_000;
+const NANOS_PER_MICRO: i64 = 1_000;
/// Represents a Fluss table for data operations
#[pyclass]
@@ -128,6 +129,70 @@ impl FlussTable {
self.has_primary_key
}
+ /// Create a new lookuper for primary key lookups.
+ ///
+ /// This is only available for tables with a primary key.
+ pub fn new_lookup(&self, _py: Python) -> PyResult<crate::Lookuper> {
+ if !self.has_primary_key {
+ return Err(FlussError::new_err(
+ "Lookup is only supported for primary key tables",
+ ));
+ }
+
+ crate::Lookuper::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ )
+ }
+
+ /// Create a new upsert writer for the table.
+ ///
+ /// This is only available for tables with a primary key.
+ ///
+ /// Args:
+ /// columns: Optional list of column names for partial update.
+ /// Only the specified columns will be updated.
+ /// column_indices: Optional list of column indices (0-based) for
partial update.
+ /// Alternative to `columns` parameter.
+ #[pyo3(signature = (columns=None, column_indices=None))]
+ pub fn new_upsert(
+ &self,
+ _py: Python,
+ columns: Option<Vec<String>>,
+ column_indices: Option<Vec<usize>>,
+ ) -> PyResult<crate::UpsertWriter> {
+ if !self.has_primary_key {
+ return Err(FlussError::new_err(
+ "Upsert is only supported for primary key tables",
+ ));
+ }
+
+ // Validate that at most one parameter is specified
+ if columns.is_some() && column_indices.is_some() {
+ return Err(FlussError::new_err(
+ "Specify only one of 'columns' or 'column_indices', not both",
+ ));
+ }
+
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let table_upsert = fluss_table
+ .new_upsert()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ crate::UpsertWriter::new(
+ table_upsert,
+ self.table_info.clone(),
+ columns,
+ column_indices,
+ )
+ }
+
fn __repr__(&self) -> String {
format!(
"FlussTable(path={}.{})",
@@ -358,7 +423,7 @@ where
}
/// Convert Python row (dict/list/tuple) to GenericRow based on schema
-fn python_to_generic_row(
+pub fn python_to_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
) -> PyResult<fcore::row::GenericRow<'static>> {
@@ -423,6 +488,115 @@ fn python_to_generic_row(
Ok(fcore::row::GenericRow { values: datums })
}
+/// Convert Python primary key values (dict/list/tuple) to GenericRow.
+/// Only requires PK columns; non-PK columns are filled with Null.
+/// For dict: keys should be PK column names.
+/// For list/tuple: values should be PK values in PK column order.
+pub fn python_pk_to_generic_row(
+ row: &Bound<PyAny>,
+ table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+ let schema = table_info.get_schema();
+ let row_type = table_info.row_type();
+ let fields = row_type.fields();
+ let pk_indexes = schema.primary_key_indexes();
+ let pk_names: Vec<&str> = schema.primary_key_column_names();
+
+ if pk_indexes.is_empty() {
+ return Err(FlussError::new_err(
+ "Table has no primary key; cannot use PK-only row",
+ ));
+ }
+
+ // Initialize all datums as Null
+ let mut datums: Vec<fcore::row::Datum<'static>> =
vec![fcore::row::Datum::Null; fields.len()];
+
+ // Extract with user-friendly error message
+ let row_input: RowInput = row.extract().map_err(|_| {
+ let type_name = row
+ .get_type()
+ .name()
+ .map(|n| n.to_string())
+ .unwrap_or_else(|_| "unknown".to_string());
+ FlussError::new_err(format!(
+ "PK row must be a dict, list, or tuple; got {type_name}"
+ ))
+ })?;
+
+ match row_input {
+ RowInput::Dict(dict) => {
+ // Validate keys are PK columns
+ for (k, _) in dict.iter() {
+ let key_str = k.extract::<&str>().map_err(|_| {
+ let key_type = k
+ .get_type()
+ .name()
+ .map(|n| n.to_string())
+ .unwrap_or_else(|_| "unknown".to_string());
+ FlussError::new_err(format!("PK dict keys must be strings;
got {key_type}"))
+ })?;
+
+ if !pk_names.contains(&key_str) {
+ return Err(FlussError::new_err(format!(
+ "Unknown PK field '{}'. Expected PK fields: {}",
+ key_str,
+ pk_names.join(", ")
+ )));
+ }
+ }
+
+ // Extract PK values
+ for (i, pk_idx) in pk_indexes.iter().enumerate() {
+ let pk_name = pk_names[i];
+ let field: &fcore::metadata::DataField = &fields[*pk_idx];
+ let value = dict
+ .get_item(pk_name)?
+ .ok_or_else(|| FlussError::new_err(format!("Missing PK
field: {}", pk_name)))?;
+ datums[*pk_idx] = python_value_to_datum(&value,
field.data_type())
+ .map_err(|e| FlussError::new_err(format!("PK field '{}':
{}", pk_name, e)))?;
+ }
+ }
+
+ RowInput::List(list) => {
+ if list.len() != pk_indexes.len() {
+ return Err(FlussError::new_err(format!(
+ "PK list must have {} elements (PK columns), got {}",
+ pk_indexes.len(),
+ list.len()
+ )));
+ }
+ for (i, pk_idx) in pk_indexes.iter().enumerate() {
+ let field: &fcore::metadata::DataField = &fields[*pk_idx];
+ let value = list.get_item(i)?;
+ datums[*pk_idx] =
+ python_value_to_datum(&value,
field.data_type()).map_err(|e| {
+ FlussError::new_err(format!("PK field '{}': {}",
field.name(), e))
+ })?;
+ }
+ }
+
+ RowInput::Tuple(tuple) => {
+ if tuple.len() != pk_indexes.len() {
+ return Err(FlussError::new_err(format!(
+ "PK tuple must have {} elements (PK columns), got {}",
+ pk_indexes.len(),
+ tuple.len()
+ )));
+ }
+ for (i, pk_idx) in pk_indexes.iter().enumerate() {
+ let field: &fcore::metadata::DataField = &fields[*pk_idx];
+ let value = tuple.get_item(i)?;
+ datums[*pk_idx] =
+ python_value_to_datum(&value,
field.data_type()).map_err(|e| {
+ FlussError::new_err(format!("PK field '{}': {}",
field.name(), e))
+ })?;
+ }
+ }
+ }
+
+ Ok(fcore::row::GenericRow { values: datums })
+}
+
/// Convert Python value to Datum based on data type
fn python_value_to_datum(
value: &Bound<PyAny>,
@@ -516,11 +690,237 @@ fn python_value_to_datum(
}
}
+/// Convert Rust Datum to Python value based on data type.
+/// This is the reverse of python_value_to_datum.
+pub fn datum_to_python_value(
+ py: Python,
+ row: &dyn fcore::row::InternalRow,
+ pos: usize,
+ data_type: &fcore::metadata::DataType,
+) -> PyResult<Py<PyAny>> {
+ use fcore::metadata::DataType;
+
+ // Check for null first
+ if row.is_null_at(pos) {
+ return Ok(py.None());
+ }
+
+ match data_type {
+ DataType::Boolean(_) => Ok(row
+ .get_boolean(pos)
+ .into_pyobject(py)?
+ .to_owned()
+ .into_any()
+ .unbind()),
+ DataType::TinyInt(_) => Ok(row
+ .get_byte(pos)
+ .into_pyobject(py)?
+ .to_owned()
+ .into_any()
+ .unbind()),
+ DataType::SmallInt(_) => Ok(row
+ .get_short(pos)
+ .into_pyobject(py)?
+ .to_owned()
+ .into_any()
+ .unbind()),
+ DataType::Int(_) => Ok(row
+ .get_int(pos)
+ .into_pyobject(py)?
+ .to_owned()
+ .into_any()
+ .unbind()),
+ DataType::BigInt(_) => Ok(row
+ .get_long(pos)
+ .into_pyobject(py)?
+ .to_owned()
+ .into_any()
+ .unbind()),
+ DataType::Float(_) => Ok(row
+ .get_float(pos)
+ .into_pyobject(py)?
+ .to_owned()
+ .into_any()
+ .unbind()),
+ DataType::Double(_) => Ok(row
+ .get_double(pos)
+ .into_pyobject(py)?
+ .to_owned()
+ .into_any()
+ .unbind()),
+ DataType::String(_) => {
+ let s = row.get_string(pos);
+ Ok(s.into_pyobject(py)?.into_any().unbind())
+ }
+ DataType::Char(char_type) => {
+ let s = row.get_char(pos, char_type.length() as usize);
+ Ok(s.into_pyobject(py)?.into_any().unbind())
+ }
+ DataType::Bytes(_) => {
+ let b = row.get_bytes(pos);
+ Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+ }
+ DataType::Binary(binary_type) => {
+ let b = row.get_binary(pos, binary_type.length());
+ Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+ }
+ DataType::Decimal(decimal_type) => {
+ let decimal = row.get_decimal(
+ pos,
+ decimal_type.precision() as usize,
+ decimal_type.scale() as usize,
+ );
+ rust_decimal_to_python(py, &decimal)
+ }
+ DataType::Date(_) => {
+ let date = row.get_date(pos);
+ rust_date_to_python(py, date)
+ }
+ DataType::Time(_) => {
+ let time = row.get_time(pos);
+ rust_time_to_python(py, time)
+ }
+ DataType::Timestamp(ts_type) => {
+ let ts = row.get_timestamp_ntz(pos, ts_type.precision());
+ rust_timestamp_ntz_to_python(py, ts)
+ }
+ DataType::TimestampLTz(ts_type) => {
+ let ts = row.get_timestamp_ltz(pos, ts_type.precision());
+ rust_timestamp_ltz_to_python(py, ts)
+ }
+ _ => Err(FlussError::new_err(format!(
+ "Unsupported data type for conversion to Python: {data_type}"
+ ))),
+ }
+}
+
+/// Convert Rust Decimal to Python decimal.Decimal
+fn rust_decimal_to_python(py: Python, decimal: &fcore::row::Decimal) ->
PyResult<Py<PyAny>> {
+ let decimal_ty = get_decimal_type(py)?;
+ let decimal_str = decimal.to_string();
+ let py_decimal = decimal_ty.call1((decimal_str,))?;
+ Ok(py_decimal.into_any().unbind())
+}
+
+/// Convert Rust Date (days since epoch) to Python datetime.date
+fn rust_date_to_python(py: Python, date: fcore::row::Date) ->
PyResult<Py<PyAny>> {
+ use pyo3::types::PyDate;
+
+ let days_since_epoch = date.get_inner();
+ let epoch = jiff::civil::date(1970, 1, 1);
+ let civil_date = epoch + jiff::Span::new().days(days_since_epoch as i64);
+
+ let py_date = PyDate::new(
+ py,
+ civil_date.year() as i32,
+ civil_date.month() as u8,
+ civil_date.day() as u8,
+ )?;
+ Ok(py_date.into_any().unbind())
+}
+
+/// Convert Rust Time (millis since midnight) to Python datetime.time
+fn rust_time_to_python(py: Python, time: fcore::row::Time) ->
PyResult<Py<PyAny>> {
+ use pyo3::types::PyTime;
+
+ let millis = time.get_inner() as i64;
+ let hours = millis / MILLIS_PER_HOUR;
+ let minutes = (millis % MILLIS_PER_HOUR) / MILLIS_PER_MINUTE;
+ let seconds = (millis % MILLIS_PER_MINUTE) / MILLIS_PER_SECOND;
+ let microseconds = (millis % MILLIS_PER_SECOND) * MICROS_PER_MILLI;
+
+ let py_time = PyTime::new(
+ py,
+ hours as u8,
+ minutes as u8,
+ seconds as u8,
+ microseconds as u32,
+ None,
+ )?;
+ Ok(py_time.into_any().unbind())
+}
+
+/// Convert Rust TimestampNtz to Python naive datetime
+fn rust_timestamp_ntz_to_python(py: Python, ts: fcore::row::TimestampNtz) ->
PyResult<Py<PyAny>> {
+ use pyo3::types::PyDateTime;
+
+ let millis = ts.get_millisecond();
+ let nanos = ts.get_nano_of_millisecond();
+ let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 /
NANOS_PER_MICRO);
+
+ // Convert to civil datetime via jiff
+ let timestamp = jiff::Timestamp::from_microsecond(total_micros)
+ .map_err(|e| FlussError::new_err(format!("Invalid timestamp: {e}")))?;
+ let civil_dt = timestamp.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+ let py_dt = PyDateTime::new(
+ py,
+ civil_dt.year() as i32,
+ civil_dt.month() as u8,
+ civil_dt.day() as u8,
+ civil_dt.hour() as u8,
+ civil_dt.minute() as u8,
+ civil_dt.second() as u8,
+ (civil_dt.subsec_nanosecond() / 1000) as u32, // microseconds
+ None,
+ )?;
+ Ok(py_dt.into_any().unbind())
+}
+
+/// Convert Rust TimestampLtz to Python timezone-aware datetime (UTC)
+fn rust_timestamp_ltz_to_python(py: Python, ts: fcore::row::TimestampLtz) ->
PyResult<Py<PyAny>> {
+ use pyo3::types::PyDateTime;
+
+ let millis = ts.get_epoch_millisecond();
+ let nanos = ts.get_nano_of_millisecond();
+ let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 /
NANOS_PER_MICRO);
+
+ // Convert to civil datetime via jiff
+ let timestamp = jiff::Timestamp::from_microsecond(total_micros)
+ .map_err(|e| FlussError::new_err(format!("Invalid timestamp: {e}")))?;
+ let civil_dt = timestamp.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+ let utc = get_utc_timezone(py)?;
+ let py_dt = PyDateTime::new(
+ py,
+ civil_dt.year() as i32,
+ civil_dt.month() as u8,
+ civil_dt.day() as u8,
+ civil_dt.hour() as u8,
+ civil_dt.minute() as u8,
+ civil_dt.second() as u8,
+ (civil_dt.subsec_nanosecond() / 1000) as u32, // microseconds
+ Some(&utc),
+ )?;
+ Ok(py_dt.into_any().unbind())
+}
+
+/// Convert an InternalRow to a Python dictionary
+pub fn internal_row_to_dict(
+ py: Python,
+ row: &dyn fcore::row::InternalRow,
+ table_info: &fcore::metadata::TableInfo,
+) -> PyResult<Py<PyAny>> {
+ let row_type = table_info.row_type();
+ let fields = row_type.fields();
+ let dict = pyo3::types::PyDict::new(py);
+
+ for (pos, field) in fields.iter().enumerate() {
+ let value = datum_to_python_value(py, row, pos, field.data_type())?;
+ dict.set_item(field.name(), value)?;
+ }
+
+ Ok(dict.into_any().unbind())
+}
+
/// Cached decimal.Decimal type
/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
pyo3::sync::PyOnceLock::new();
+/// Cached UTC timezone
+static UTC_TIMEZONE: pyo3::sync::PyOnceLock<Py<PyAny>> =
pyo3::sync::PyOnceLock::new();
+
/// Cached UTC epoch type
static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> =
pyo3::sync::PyOnceLock::new();
@@ -536,6 +936,21 @@ fn get_decimal_type(py: Python) ->
PyResult<Bound<pyo3::types::PyType>> {
Ok(ty.bind(py).clone())
}
+/// Get the cached UTC timezone (datetime.timezone.utc), creating it once per
interpreter.
+fn get_utc_timezone(py: Python) -> PyResult<Bound<pyo3::types::PyTzInfo>> {
+ let tz = UTC_TIMEZONE.get_or_try_init(py, || -> PyResult<_> {
+ let datetime_mod = py.import("datetime")?;
+ let timezone = datetime_mod.getattr("timezone")?;
+ let utc = timezone.getattr("utc")?;
+ Ok(utc.unbind())
+ })?;
+ // Downcast to PyTzInfo for use with PyDateTime::new()
+ Ok(tz
+ .bind(py)
+ .clone()
+ .downcast_into::<pyo3::types::PyTzInfo>()?)
+}
+
/// Get the cached UTC epoch datetime, creating it once per interpreter.
fn get_utc_epoch(py: Python) -> PyResult<Bound<PyAny>> {
let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
new file mode 100644
index 0000000..08b3597
--- /dev/null
+++ b/bindings/python/src/upsert.rs
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::table::{python_pk_to_generic_row, python_to_generic_row};
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+/// Writer for upserting and deleting data in a Fluss primary key table.
+///
+/// Each upsert/delete operation is sent to the server and waits for
acknowledgment.
+/// Multiple concurrent writers share a common WriterClient which batches
requests
+/// for efficiency.
+///
+/// # Example:
+/// writer = table.new_upsert()
+/// await writer.upsert(row1)
+/// await writer.upsert(row2)
+/// await writer.delete(pk)
+/// await writer.flush() # Ensures all pending operations are acknowledged
+#[pyclass]
+pub struct UpsertWriter {
+ inner: Arc<UpsertWriterInner>,
+}
+
+struct UpsertWriterInner {
+ table_upsert: fcore::client::TableUpsert,
+ /// Lazily initialized writer - created on first write operation
+ writer: Mutex<Option<fcore::client::UpsertWriter>>,
+ table_info: fcore::metadata::TableInfo,
+}
+
+#[pymethods]
+impl UpsertWriter {
+ /// Upsert a row into the table.
+ ///
+ /// If a row with the same primary key exists, it will be updated.
+ /// Otherwise, a new row will be inserted.
+ ///
+ /// Args:
+ /// row: A dict, list, or tuple containing the row data.
+ /// For dict: keys are column names, values are column values.
+ /// For list/tuple: values must be in schema order.
+ ///
+ /// Returns:
+ /// None on success
+ pub fn upsert<'py>(
+ &self,
+ py: Python<'py>,
+ row: &Bound<'_, PyAny>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let generic_row = python_to_generic_row(row, &self.inner.table_info)?;
+ let inner = self.inner.clone();
+
+ future_into_py(py, async move {
+ let mut guard = inner.get_or_create_writer().await?;
+ let writer = guard.as_mut().unwrap();
+ writer
+ .upsert(&generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(())
+ })
+ }
+
+ /// Delete a row from the table by primary key.
+ ///
+ /// Args:
+ /// pk: A dict, list, or tuple containing only the primary key values.
+ /// For dict: keys are PK column names.
+ /// For list/tuple: values in PK column order.
+ ///
+ /// Returns:
+ /// None on success
+ pub fn delete<'py>(
+ &self,
+ py: Python<'py>,
+ pk: &Bound<'_, PyAny>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let generic_row = python_pk_to_generic_row(pk,
&self.inner.table_info)?;
+ let inner = self.inner.clone();
+
+ future_into_py(py, async move {
+ let mut guard = inner.get_or_create_writer().await?;
+ let writer = guard.as_mut().unwrap();
+ writer
+ .delete(&generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(())
+ })
+ }
+
+ /// Flush all pending upsert/delete operations to the server.
+ ///
+ /// This method sends all buffered operations and blocks until they are
+ /// acknowledged according to the writer's ack configuration.
+ ///
+ /// Returns:
+ /// None on success
+ pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+ let inner = self.inner.clone();
+
+ future_into_py(py, async move {
+ let writer_guard = inner.writer.lock().await;
+
+ if let Some(writer) = writer_guard.as_ref() {
+ writer
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ } else {
+ // Nothing to flush - no writer was created yet
+ Ok(())
+ }
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "UpsertWriter()".to_string()
+ }
+}
+
+impl UpsertWriter {
+ /// Create an UpsertWriter from a TableUpsert.
+ ///
+ /// Optionally supports partial updates via column names or indices.
+ pub fn new(
+ table_upsert: fcore::client::TableUpsert,
+ table_info: fcore::metadata::TableInfo,
+ columns: Option<Vec<String>>,
+ column_indices: Option<Vec<usize>>,
+ ) -> PyResult<Self> {
+ // Apply partial update configuration if specified
+ let table_upsert = if let Some(cols) = columns {
+ let col_refs: Vec<&str> = cols.iter().map(|s|
s.as_str()).collect();
+ table_upsert
+ .partial_update_with_column_names(&col_refs)
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ } else if let Some(indices) = column_indices {
+ table_upsert
+ .partial_update(Some(indices))
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ } else {
+ table_upsert
+ };
+
+ Ok(Self {
+ inner: Arc::new(UpsertWriterInner {
+ table_upsert,
+ writer: Mutex::new(None),
+ table_info,
+ }),
+ })
+ }
+}
+
+impl UpsertWriterInner {
+ /// Get the cached writer or create one on first use.
+ async fn get_or_create_writer(
+ &self,
+ ) -> PyResult<tokio::sync::MutexGuard<'_,
Option<fcore::client::UpsertWriter>>> {
+ let mut guard = self.writer.lock().await;
+ if guard.is_none() {
+ let writer = self
+ .table_upsert
+ .create_writer()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ *guard = Some(writer);
+ }
+ Ok(guard)
+ }
+}
diff --git a/crates/fluss/src/client/table/lookup.rs
b/crates/fluss/src/client/table/lookup.rs
index 69cb91e..5410002 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -16,7 +16,6 @@
// under the License.
use crate::bucketing::BucketingFunction;
-use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
use crate::client::table::partition_getter::PartitionGetter;
use crate::error::{Error, Result};
@@ -26,6 +25,7 @@ use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
use crate::rpc::ApiError;
+use crate::rpc::RpcClient;
use crate::rpc::message::LookupRequest;
use std::sync::Arc;
@@ -34,19 +34,19 @@ use std::sync::Arc;
/// Contains the rows returned from a lookup. For primary key lookups,
/// this will contain at most one row. For prefix key lookups (future),
/// this may contain multiple rows.
-pub struct LookupResult<'a> {
+pub struct LookupResult {
rows: Vec<Vec<u8>>,
- row_type: &'a RowType,
+ row_type: Arc<RowType>,
}
-impl<'a> LookupResult<'a> {
+impl LookupResult {
/// Creates a new LookupResult from a list of row bytes.
- fn new(rows: Vec<Vec<u8>>, row_type: &'a RowType) -> Self {
+ fn new(rows: Vec<Vec<u8>>, row_type: Arc<RowType>) -> Self {
Self { rows, row_type }
}
/// Creates an empty LookupResult.
- fn empty(row_type: &'a RowType) -> Self {
+ fn empty(row_type: Arc<RowType>) -> Self {
Self {
rows: Vec::new(),
row_type,
@@ -67,7 +67,7 @@ impl<'a> LookupResult<'a> {
match self.rows.len() {
0 => Ok(None),
1 => Ok(Some(CompactedRow::from_bytes(
- self.row_type,
+ &self.row_type,
&self.rows[0][SCHEMA_ID_LENGTH..],
))),
_ => Err(Error::UnexpectedError {
@@ -82,7 +82,7 @@ impl<'a> LookupResult<'a> {
self.rows
.iter()
// TODO Add schema id check and fetch when implementing prefix
lookup
- .map(|bytes| CompactedRow::from_bytes(self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
+ .map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
}
@@ -104,20 +104,20 @@ impl<'a> LookupResult<'a> {
/// ```
// TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper)
// TODO: Add create_typed_lookuper<T>() for typed lookups with POJO mapping
-pub struct TableLookup<'a> {
- conn: &'a FlussConnection,
+pub struct TableLookup {
+ rpc_client: Arc<RpcClient>,
table_info: TableInfo,
metadata: Arc<Metadata>,
}
-impl<'a> TableLookup<'a> {
+impl TableLookup {
pub(super) fn new(
- conn: &'a FlussConnection,
+ rpc_client: Arc<RpcClient>,
table_info: TableInfo,
metadata: Arc<Metadata>,
) -> Self {
Self {
- conn,
+ rpc_client,
table_info,
metadata,
}
@@ -127,7 +127,7 @@ impl<'a> TableLookup<'a> {
///
/// The lookuper will automatically encode the key and compute the bucket
/// for each lookup using the appropriate bucketing function.
- pub fn create_lookuper(self) -> Result<Lookuper<'a>> {
+ pub fn create_lookuper(self) -> Result<Lookuper> {
let num_buckets = self.table_info.get_num_buckets();
// Get data lake format from table config for bucketing function
@@ -162,9 +162,11 @@ impl<'a> TableLookup<'a> {
None
};
+ let row_type = Arc::new(self.table_info.row_type().clone());
Ok(Lookuper {
- conn: self.conn,
+ rpc_client: self.rpc_client,
table_path: Arc::new(self.table_info.table_path.clone()),
+ row_type,
table_info: self.table_info,
metadata: self.metadata,
bucketing_function,
@@ -187,9 +189,10 @@ impl<'a> TableLookup<'a> {
/// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key
/// let result = lookuper.lookup(&row).await?;
/// ```
-pub struct Lookuper<'a> {
- conn: &'a FlussConnection,
+pub struct Lookuper {
+ rpc_client: Arc<RpcClient>,
table_info: TableInfo,
+ row_type: Arc<RowType>,
table_path: Arc<TablePath>,
metadata: Arc<Metadata>,
bucketing_function: Box<dyn BucketingFunction>,
@@ -199,7 +202,7 @@ pub struct Lookuper<'a> {
num_buckets: i32,
}
-impl<'a> Lookuper<'a> {
+impl Lookuper {
/// Looks up a value by its primary key.
///
/// The key is encoded and the bucket is automatically computed using
@@ -211,7 +214,7 @@ impl<'a> Lookuper<'a> {
/// # Returns
/// * `Ok(LookupResult)` - The lookup result (may be empty if key not
found)
/// * `Err(Error)` - If the lookup fails
- pub async fn lookup(&mut self, row: &dyn InternalRow) ->
Result<LookupResult<'_>> {
+ pub async fn lookup(&mut self, row: &dyn InternalRow) ->
Result<LookupResult> {
// todo: support batch lookup
let pk_bytes = self.primary_key_encoder.encode_key(row)?;
let pk_bytes_vec = pk_bytes.to_vec();
@@ -231,7 +234,7 @@ impl<'a> Lookuper<'a> {
Some(id) => Some(id),
None => {
// Partition doesn't exist, return empty result (like Java)
- return Ok(LookupResult::empty(self.table_info.row_type()));
+ return Ok(LookupResult::empty(Arc::clone(&self.row_type)));
}
}
} else {
@@ -266,8 +269,7 @@ impl<'a> Lookuper<'a> {
),
})?;
- let connections = self.conn.get_connections();
- let connection = connections.get_connection(tablet_server).await?;
+ let connection = self.rpc_client.get_connection(tablet_server).await?;
// Send lookup request
let request = LookupRequest::new(table_id, partition_id, bucket_id,
vec![pk_bytes_vec]);
@@ -294,10 +296,10 @@ impl<'a> Lookuper<'a> {
.filter_map(|pb_value| pb_value.values)
.collect();
- return Ok(LookupResult::new(rows, self.table_info.row_type()));
+ return Ok(LookupResult::new(rows, Arc::clone(&self.row_type)));
}
- Ok(LookupResult::empty(self.table_info.row_type()))
+ Ok(LookupResult::empty(Arc::clone(&self.row_type)))
}
/// Returns a reference to the table info.
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 6d54933..37e9b45 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -112,14 +112,14 @@ impl<'a> FlussTable<'a> {
/// println!("Found value: {:?}", value);
/// }
/// ```
- pub fn new_lookup(&self) -> Result<TableLookup<'_>> {
+ pub fn new_lookup(&self) -> Result<TableLookup> {
if !self.has_primary_key {
return Err(Error::UnsupportedOperation {
message: "Lookup is only supported for primary key
tables".to_string(),
});
}
Ok(TableLookup::new(
- self.conn,
+ self.conn.get_connections(),
self.table_info.clone(),
self.metadata.clone(),
))
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 422f9d3..aa9fca4 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -97,7 +97,7 @@ impl<'a> TableScan<'a> {
/// .column("col1", DataTypes::int())
/// .column("col2", DataTypes::string())
/// .column("col3", DataTypes::string())
- /// .column("col3", DataTypes::string())
+ /// .column("col4", DataTypes::string())
/// .build()?,
/// ).build()?;
/// let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
@@ -179,7 +179,6 @@ impl<'a> TableScan<'a> {
/// let admin = conn.get_admin().await?;
/// admin.create_table(&table_path, &table_descriptor, true)
/// .await?;
- /// let table_info = admin.get_table(&table_path).await?;
/// let table = conn.get_table(&table_path).await?;
///
/// // Project columns by column names
diff --git a/crates/fluss/src/row/encode/mod.rs
b/crates/fluss/src/row/encode/mod.rs
index d5cf8ac..1ce7aef 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -27,7 +27,7 @@ use bytes::Bytes;
/// An interface for encoding key of row into bytes.
#[allow(dead_code)]
-pub trait KeyEncoder {
+pub trait KeyEncoder: Send + Sync {
fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
}
@@ -71,7 +71,7 @@ impl KeyEncoderFactory {
/// 2. call method [`RowEncoder::encode_field()`] to write the row's field.
/// 3. call method [`RowEncoder::finishRow()`] to finish the writing and get
the written row.
#[allow(dead_code)]
-pub trait RowEncoder {
+pub trait RowEncoder: Send + Sync {
/// Start to write a new row.
///
/// # Returns