This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 31e9657 feat: Partitioned KV tables python example (#290)
31e9657 is described below
commit 31e9657ca62e6ab72364e928b372e83b5eb1d413
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 9 14:51:13 2026 +0000
feat: Partitioned KV tables python example (#290)
---
bindings/python/example/example.py | 106 +++++++++++++++++++++++++++++++++++++
1 file changed, 106 insertions(+)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 9f8cafa..3d42353 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -802,6 +802,112 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()
+ # =====================================================
+ # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned KV Table ---")
+ print("=" * 60)
+
+ partitioned_kv_fields = [
+ pa.field("region", pa.string()), # partition key + part of PK
+ pa.field("user_id", pa.int32()), # part of PK
+ pa.field("name", pa.string()),
+ pa.field("score", pa.int64()),
+ ]
+ partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+ fluss_partitioned_kv_schema = fluss.Schema(
+ partitioned_kv_schema, primary_keys=["region", "user_id"]
+ )
+
+ partitioned_kv_descriptor = fluss.TableDescriptor(
+ fluss_partitioned_kv_schema,
+ partition_keys=["region"],
+ )
+
+ partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+ try:
+ await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+ await admin.create_table(partitioned_kv_path,
partitioned_kv_descriptor, False)
+ print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+ # Create partitions
+ await admin.create_partition(partitioned_kv_path, {"region": "US"})
+ await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+ await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
+ print("Created partitions: US, EU, APAC")
+
+ partitioned_kv_table = await conn.get_table(partitioned_kv_path)
+ upsert_writer = partitioned_kv_table.new_upsert()
+
+ # Upsert rows across partitions
+ test_data = [
+ ("US", 1, "Gustave", 100),
+ ("US", 2, "Lune", 200),
+ ("EU", 1, "Sciel", 150),
+ ("EU", 2, "Maelle", 250),
+ ("APAC", 1, "Noco", 300),
+ ]
+ for region, user_id, name, score in test_data:
+ upsert_writer.upsert({
+ "region": region, "user_id": user_id,
+ "name": name, "score": score,
+ })
+ await upsert_writer.flush()
+ print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+ # Lookup all rows across partitions
+ print("\n--- Lookup across partitions ---")
+ lookuper = partitioned_kv_table.new_lookup()
+ for region, user_id, name, score in test_data:
+ result = await lookuper.lookup({"region": region, "user_id":
user_id})
+ assert result is not None, f"Expected to find region={region}
user_id={user_id}"
+ assert result["name"] == name, f"Name mismatch: {result['name']}
!= {name}"
+ assert result["score"] == score, f"Score mismatch:
{result['score']} != {score}"
+ print(f"All {len(test_data)} rows verified across partitions")
+
+ # Update within a partition
+ print("\n--- Update within partition ---")
+ handle = upsert_writer.upsert({
+ "region": "US", "user_id": 1,
+ "name": "Gustave Updated", "score": 999,
+ })
+ await handle.wait()
+ result = await lookuper.lookup({"region": "US", "user_id": 1})
+ assert result is not None, "Expected to find region=US user_id=1 after
update"
+ assert result["name"] == "Gustave Updated"
+ assert result["score"] == 999
+ print(f"Update verified: US/1 name={result['name']}
score={result['score']}")
+
+ # Lookup in non-existent partition
+ print("\n--- Lookup in non-existent partition ---")
+ result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1})
+ assert result is None, "Expected UNKNOWN partition lookup to return
None"
+ print("UNKNOWN partition lookup: not found (expected)")
+
+ # Delete within a partition
+ print("\n--- Delete within partition ---")
+ handle = upsert_writer.delete({"region": "EU", "user_id": 1})
+ await handle.wait()
+ result = await lookuper.lookup({"region": "EU", "user_id": 1})
+ assert result is None, "Expected EU/1 to be deleted"
+ print("Delete verified: EU/1 not found")
+
+ # Verify sibling record still exists
+ result = await lookuper.lookup({"region": "EU", "user_id": 2})
+ assert result is not None, "Expected EU/2 to still exist"
+ assert result["name"] == "Maelle"
+ print(f"EU/2 still exists: name={result['name']}")
+
+ # Cleanup
+ await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+ print(f"\nDropped partitioned KV table: {partitioned_kv_path}")
+
+ except Exception as e:
+ print(f"Error with partitioned KV table: {e}")
+ traceback.print_exc()
+
# Close connection
conn.close()
print("\nConnection closed")