This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 31e9657  feat: Partitioned KV tables python example (#290)
31e9657 is described below

commit 31e9657ca62e6ab72364e928b372e83b5eb1d413
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 9 14:51:13 2026 +0000

    feat: Partitioned KV tables python example (#290)
---
 bindings/python/example/example.py | 106 +++++++++++++++++++++++++++++++++++++
 1 file changed, 106 insertions(+)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 9f8cafa..3d42353 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -802,6 +802,112 @@ async def main():
         print(f"Error with partitioned table: {e}")
         traceback.print_exc()
 
+    # =====================================================
+    # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned KV Table ---")
+    print("=" * 60)
+
+    partitioned_kv_fields = [
+        pa.field("region", pa.string()),   # partition key + part of PK
+        pa.field("user_id", pa.int32()),   # part of PK
+        pa.field("name", pa.string()),
+        pa.field("score", pa.int64()),
+    ]
+    partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+    fluss_partitioned_kv_schema = fluss.Schema(
+        partitioned_kv_schema, primary_keys=["region", "user_id"]
+    )
+
+    partitioned_kv_descriptor = fluss.TableDescriptor(
+        fluss_partitioned_kv_schema,
+        partition_keys=["region"],
+    )
+
+    partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+    try:
+        await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+        await admin.create_table(partitioned_kv_path, 
partitioned_kv_descriptor, False)
+        print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+        # Create partitions
+        await admin.create_partition(partitioned_kv_path, {"region": "US"})
+        await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+        await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
+        print("Created partitions: US, EU, APAC")
+
+        partitioned_kv_table = await conn.get_table(partitioned_kv_path)
+        upsert_writer = partitioned_kv_table.new_upsert()
+
+        # Upsert rows across partitions
+        test_data = [
+            ("US", 1, "Gustave", 100),
+            ("US", 2, "Lune", 200),
+            ("EU", 1, "Sciel", 150),
+            ("EU", 2, "Maelle", 250),
+            ("APAC", 1, "Noco", 300),
+        ]
+        for region, user_id, name, score in test_data:
+            upsert_writer.upsert({
+                "region": region, "user_id": user_id,
+                "name": name, "score": score,
+            })
+        await upsert_writer.flush()
+        print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+        # Lookup all rows across partitions
+        print("\n--- Lookup across partitions ---")
+        lookuper = partitioned_kv_table.new_lookup()
+        for region, user_id, name, score in test_data:
+            result = await lookuper.lookup({"region": region, "user_id": 
user_id})
+            assert result is not None, f"Expected to find region={region} 
user_id={user_id}"
+            assert result["name"] == name, f"Name mismatch: {result['name']} 
!= {name}"
+            assert result["score"] == score, f"Score mismatch: 
{result['score']} != {score}"
+        print(f"All {len(test_data)} rows verified across partitions")
+
+        # Update within a partition
+        print("\n--- Update within partition ---")
+        handle = upsert_writer.upsert({
+            "region": "US", "user_id": 1,
+            "name": "Gustave Updated", "score": 999,
+        })
+        await handle.wait()
+        result = await lookuper.lookup({"region": "US", "user_id": 1})
+        assert result is not None, "Expected to find region=US user_id=1 after 
update"
+        assert result["name"] == "Gustave Updated"
+        assert result["score"] == 999
+        print(f"Update verified: US/1 name={result['name']} 
score={result['score']}")
+
+        # Lookup in non-existent partition
+        print("\n--- Lookup in non-existent partition ---")
+        result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1})
+        assert result is None, "Expected UNKNOWN partition lookup to return 
None"
+        print("UNKNOWN partition lookup: not found (expected)")
+
+        # Delete within a partition
+        print("\n--- Delete within partition ---")
+        handle = upsert_writer.delete({"region": "EU", "user_id": 1})
+        await handle.wait()
+        result = await lookuper.lookup({"region": "EU", "user_id": 1})
+        assert result is None, "Expected EU/1 to be deleted"
+        print("Delete verified: EU/1 not found")
+
+        # Verify sibling record still exists
+        result = await lookuper.lookup({"region": "EU", "user_id": 2})
+        assert result is not None, "Expected EU/2 to still exist"
+        assert result["name"] == "Maelle"
+        print(f"EU/2 still exists: name={result['name']}")
+
+        # Cleanup
+        await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+        print(f"\nDropped partitioned KV table: {partitioned_kv_path}")
+
+    except Exception as e:
+        print(f"Error with partitioned KV table: {e}")
+        traceback.print_exc()
+
     # Close connection
     conn.close()
     print("\nConnection closed")

Reply via email to