Copilot commented on code in PR #290:
URL: https://github.com/apache/fluss-rust/pull/290#discussion_r2780998078


##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
         print(f"Error with partitioned table: {e}")
         traceback.print_exc()
 
+    # =====================================================
+    # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned KV Table ---")
+    print("=" * 60)
+
+    partitioned_kv_fields = [
+        pa.field("region", pa.string()),   # partition key + part of PK
+        pa.field("user_id", pa.int32()),   # part of PK
+        pa.field("name", pa.string()),
+        pa.field("score", pa.int64()),
+    ]
+    partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+    fluss_partitioned_kv_schema = fluss.Schema(
+        partitioned_kv_schema, primary_keys=["region", "user_id"]
+    )
+
+    partitioned_kv_descriptor = fluss.TableDescriptor(
+        fluss_partitioned_kv_schema,
+        partition_keys=["region"],
+    )
+
+    partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+    try:
+        await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+        await admin.create_table(partitioned_kv_path, 
partitioned_kv_descriptor, False)
+        print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+        # Create partitions
+        await admin.create_partition(partitioned_kv_path, {"region": "US"})
+        await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+        await admin.create_partition(partitioned_kv_path, {"region": "APAC"})

Review Comment:
   `create_partition` is called without `ignore_if_exists=True`, so a 
partial/previous run (or concurrent run) can cause this example to raise on 
partition creation. Using `ignore_if_exists=True` (as done earlier in this 
file) makes the demo rerunnable.
   ```suggestion
           await admin.create_partition(partitioned_kv_path, {"region": "US"}, 
ignore_if_exists=True)
           await admin.create_partition(partitioned_kv_path, {"region": "EU"}, 
ignore_if_exists=True)
           await admin.create_partition(partitioned_kv_path, {"region": 
"APAC"}, ignore_if_exists=True)
   ```



##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
         print(f"Error with partitioned table: {e}")
         traceback.print_exc()
 
+    # =====================================================
+    # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned KV Table ---")
+    print("=" * 60)
+
+    partitioned_kv_fields = [
+        pa.field("region", pa.string()),   # partition key + part of PK
+        pa.field("user_id", pa.int32()),   # part of PK
+        pa.field("name", pa.string()),
+        pa.field("score", pa.int64()),
+    ]
+    partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+    fluss_partitioned_kv_schema = fluss.Schema(
+        partitioned_kv_schema, primary_keys=["region", "user_id"]
+    )
+
+    partitioned_kv_descriptor = fluss.TableDescriptor(
+        fluss_partitioned_kv_schema,
+        partition_keys=["region"],
+    )
+
+    partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+    try:
+        await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+        await admin.create_table(partitioned_kv_path, 
partitioned_kv_descriptor, False)
+        print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+        # Create partitions
+        await admin.create_partition(partitioned_kv_path, {"region": "US"})
+        await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+        await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
+        print("Created partitions: US, EU, APAC")
+
+        partitioned_kv_table = await conn.get_table(partitioned_kv_path)
+        upsert_writer = partitioned_kv_table.new_upsert()
+
+        # Upsert rows across partitions
+        test_data = [
+            ("US", 1, "Gustave", 100),
+            ("US", 2, "Lune", 200),
+            ("EU", 1, "Sciel", 150),
+            ("EU", 2, "Maelle", 250),
+            ("APAC", 1, "Noco", 300),
+        ]
+        for region, user_id, name, score in test_data:
+            upsert_writer.upsert({
+                "region": region, "user_id": user_id,
+                "name": name, "score": score,
+            })
+        await upsert_writer.flush()
+        print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+        # Lookup all rows across partitions
+        print("\n--- Lookup across partitions ---")
+        lookuper = partitioned_kv_table.new_lookup()
+        for region, user_id, name, score in test_data:
+            result = await lookuper.lookup({"region": region, "user_id": 
user_id})
+            assert result is not None, f"Expected to find region={region} 
user_id={user_id}"
+            assert result["name"] == name, f"Name mismatch: {result['name']} 
!= {name}"
+            assert result["score"] == score, f"Score mismatch: 
{result['score']} != {score}"

Review Comment:
   This example uses `assert` for runtime verification. Python drops asserts 
when run with `-O`, so these checks may silently disappear. Prefer explicit 
checks that raise (e.g., `if ...: raise AssertionError(...)`) so the example 
behaves consistently.



##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
         print(f"Error with partitioned table: {e}")
         traceback.print_exc()
 
+    # =====================================================
+    # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned KV Table ---")
+    print("=" * 60)
+
+    partitioned_kv_fields = [
+        pa.field("region", pa.string()),   # partition key + part of PK
+        pa.field("user_id", pa.int32()),   # part of PK
+        pa.field("name", pa.string()),
+        pa.field("score", pa.int64()),
+    ]
+    partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+    fluss_partitioned_kv_schema = fluss.Schema(
+        partitioned_kv_schema, primary_keys=["region", "user_id"]
+    )
+
+    partitioned_kv_descriptor = fluss.TableDescriptor(
+        fluss_partitioned_kv_schema,
+        partition_keys=["region"],
+    )
+
+    partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+    try:
+        await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+        await admin.create_table(partitioned_kv_path, 
partitioned_kv_descriptor, False)

Review Comment:
   `create_table(..., False)` makes the demo fail on reruns if the previous run 
exited before cleanup (table already exists). Consider setting 
`ignore_if_exists=True` here (matching the Rust example and other parts of this 
file) so the example is more robust/idempotent.
   ```suggestion
           await admin.create_table(partitioned_kv_path, 
partitioned_kv_descriptor, True)
   ```



##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
         print(f"Error with partitioned table: {e}")
         traceback.print_exc()
 
+    # =====================================================
+    # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned KV Table ---")
+    print("=" * 60)
+
+    partitioned_kv_fields = [
+        pa.field("region", pa.string()),   # partition key + part of PK
+        pa.field("user_id", pa.int32()),   # part of PK
+        pa.field("name", pa.string()),
+        pa.field("score", pa.int64()),
+    ]
+    partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+    fluss_partitioned_kv_schema = fluss.Schema(
+        partitioned_kv_schema, primary_keys=["region", "user_id"]
+    )
+
+    partitioned_kv_descriptor = fluss.TableDescriptor(
+        fluss_partitioned_kv_schema,
+        partition_keys=["region"],
+    )
+
+    partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+    try:
+        await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+        await admin.create_table(partitioned_kv_path, 
partitioned_kv_descriptor, False)
+        print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+        # Create partitions
+        await admin.create_partition(partitioned_kv_path, {"region": "US"})
+        await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+        await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
+        print("Created partitions: US, EU, APAC")
+
+        partitioned_kv_table = await conn.get_table(partitioned_kv_path)
+        upsert_writer = partitioned_kv_table.new_upsert()
+
+        # Upsert rows across partitions
+        test_data = [
+            ("US", 1, "Gustave", 100),
+            ("US", 2, "Lune", 200),
+            ("EU", 1, "Sciel", 150),
+            ("EU", 2, "Maelle", 250),
+            ("APAC", 1, "Noco", 300),
+        ]
+        for region, user_id, name, score in test_data:
+            upsert_writer.upsert({
+                "region": region, "user_id": user_id,
+                "name": name, "score": score,
+            })
+        await upsert_writer.flush()
+        print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+        # Lookup all rows across partitions
+        print("\n--- Lookup across partitions ---")
+        lookuper = partitioned_kv_table.new_lookup()
+        for region, user_id, name, score in test_data:
+            result = await lookuper.lookup({"region": region, "user_id": 
user_id})
+            assert result is not None, f"Expected to find region={region} 
user_id={user_id}"
+            assert result["name"] == name, f"Name mismatch: {result['name']} 
!= {name}"
+            assert result["score"] == score, f"Score mismatch: 
{result['score']} != {score}"
+        print(f"All {len(test_data)} rows verified across partitions")
+
+        # Update within a partition
+        print("\n--- Update within partition ---")
+        handle = upsert_writer.upsert({
+            "region": "US", "user_id": 1,
+            "name": "Gustave Updated", "score": 999,
+        })
+        await handle.wait()
+        result = await lookuper.lookup({"region": "US", "user_id": 1})

Review Comment:
   After the update, `result` is indexed without first verifying it is 
non-`None`. If the lookup fails, this will raise a confusing `TypeError` rather 
than a clear failure. Add an explicit `None` check (with a helpful message) 
before accessing fields.
   ```suggestion
           result = await lookuper.lookup({"region": "US", "user_id": 1})
           assert result is not None, "Expected to find region=US user_id=1 
after update"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to