Copilot commented on code in PR #290:
URL: https://github.com/apache/fluss-rust/pull/290#discussion_r2780998078
##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()
+ # =====================================================
+ # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned KV Table ---")
+ print("=" * 60)
+
+ partitioned_kv_fields = [
+ pa.field("region", pa.string()), # partition key + part of PK
+ pa.field("user_id", pa.int32()), # part of PK
+ pa.field("name", pa.string()),
+ pa.field("score", pa.int64()),
+ ]
+ partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+ fluss_partitioned_kv_schema = fluss.Schema(
+ partitioned_kv_schema, primary_keys=["region", "user_id"]
+ )
+
+ partitioned_kv_descriptor = fluss.TableDescriptor(
+ fluss_partitioned_kv_schema,
+ partition_keys=["region"],
+ )
+
+ partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+ try:
+ await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+ await admin.create_table(partitioned_kv_path,
partitioned_kv_descriptor, False)
+ print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+ # Create partitions
+ await admin.create_partition(partitioned_kv_path, {"region": "US"})
+ await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+ await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
Review Comment:
`create_partition` is called without `ignore_if_exists=True`, so a
partial/previous run (or concurrent run) can cause this example to raise on
partition creation. Using `ignore_if_exists=True` (as done earlier in this
file) makes the demo rerunnable.
```suggestion
await admin.create_partition(partitioned_kv_path, {"region": "US"},
ignore_if_exists=True)
await admin.create_partition(partitioned_kv_path, {"region": "EU"},
ignore_if_exists=True)
await admin.create_partition(partitioned_kv_path, {"region":
"APAC"}, ignore_if_exists=True)
```
##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()
+ # =====================================================
+ # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned KV Table ---")
+ print("=" * 60)
+
+ partitioned_kv_fields = [
+ pa.field("region", pa.string()), # partition key + part of PK
+ pa.field("user_id", pa.int32()), # part of PK
+ pa.field("name", pa.string()),
+ pa.field("score", pa.int64()),
+ ]
+ partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+ fluss_partitioned_kv_schema = fluss.Schema(
+ partitioned_kv_schema, primary_keys=["region", "user_id"]
+ )
+
+ partitioned_kv_descriptor = fluss.TableDescriptor(
+ fluss_partitioned_kv_schema,
+ partition_keys=["region"],
+ )
+
+ partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+ try:
+ await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+ await admin.create_table(partitioned_kv_path,
partitioned_kv_descriptor, False)
+ print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+ # Create partitions
+ await admin.create_partition(partitioned_kv_path, {"region": "US"})
+ await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+ await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
+ print("Created partitions: US, EU, APAC")
+
+ partitioned_kv_table = await conn.get_table(partitioned_kv_path)
+ upsert_writer = partitioned_kv_table.new_upsert()
+
+ # Upsert rows across partitions
+ test_data = [
+ ("US", 1, "Gustave", 100),
+ ("US", 2, "Lune", 200),
+ ("EU", 1, "Sciel", 150),
+ ("EU", 2, "Maelle", 250),
+ ("APAC", 1, "Noco", 300),
+ ]
+ for region, user_id, name, score in test_data:
+ upsert_writer.upsert({
+ "region": region, "user_id": user_id,
+ "name": name, "score": score,
+ })
+ await upsert_writer.flush()
+ print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+ # Lookup all rows across partitions
+ print("\n--- Lookup across partitions ---")
+ lookuper = partitioned_kv_table.new_lookup()
+ for region, user_id, name, score in test_data:
+ result = await lookuper.lookup({"region": region, "user_id":
user_id})
+ assert result is not None, f"Expected to find region={region}
user_id={user_id}"
+ assert result["name"] == name, f"Name mismatch: {result['name']}
!= {name}"
+ assert result["score"] == score, f"Score mismatch:
{result['score']} != {score}"
Review Comment:
This example uses `assert` for runtime verification. Python drops asserts
when run with `-O`, so these checks may silently disappear. Prefer explicit
checks that raise (e.g., `if ...: raise AssertionError(...)`) so the example
behaves consistently.
##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()
+ # =====================================================
+ # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned KV Table ---")
+ print("=" * 60)
+
+ partitioned_kv_fields = [
+ pa.field("region", pa.string()), # partition key + part of PK
+ pa.field("user_id", pa.int32()), # part of PK
+ pa.field("name", pa.string()),
+ pa.field("score", pa.int64()),
+ ]
+ partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+ fluss_partitioned_kv_schema = fluss.Schema(
+ partitioned_kv_schema, primary_keys=["region", "user_id"]
+ )
+
+ partitioned_kv_descriptor = fluss.TableDescriptor(
+ fluss_partitioned_kv_schema,
+ partition_keys=["region"],
+ )
+
+ partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+ try:
+ await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+ await admin.create_table(partitioned_kv_path,
partitioned_kv_descriptor, False)
Review Comment:
`create_table(..., False)` makes the demo fail on reruns if the previous run
exited before cleanup (table already exists). Consider setting
`ignore_if_exists=True` here (matching the Rust example and other parts of this
file) so the example is more robust/idempotent.
```suggestion
await admin.create_table(partitioned_kv_path,
partitioned_kv_descriptor, True)
```
##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()
+ # =====================================================
+ # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned KV Table ---")
+ print("=" * 60)
+
+ partitioned_kv_fields = [
+ pa.field("region", pa.string()), # partition key + part of PK
+ pa.field("user_id", pa.int32()), # part of PK
+ pa.field("name", pa.string()),
+ pa.field("score", pa.int64()),
+ ]
+ partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+ fluss_partitioned_kv_schema = fluss.Schema(
+ partitioned_kv_schema, primary_keys=["region", "user_id"]
+ )
+
+ partitioned_kv_descriptor = fluss.TableDescriptor(
+ fluss_partitioned_kv_schema,
+ partition_keys=["region"],
+ )
+
+ partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+ try:
+ await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+ await admin.create_table(partitioned_kv_path,
partitioned_kv_descriptor, False)
+ print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+ # Create partitions
+ await admin.create_partition(partitioned_kv_path, {"region": "US"})
+ await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+ await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
+ print("Created partitions: US, EU, APAC")
+
+ partitioned_kv_table = await conn.get_table(partitioned_kv_path)
+ upsert_writer = partitioned_kv_table.new_upsert()
+
+ # Upsert rows across partitions
+ test_data = [
+ ("US", 1, "Gustave", 100),
+ ("US", 2, "Lune", 200),
+ ("EU", 1, "Sciel", 150),
+ ("EU", 2, "Maelle", 250),
+ ("APAC", 1, "Noco", 300),
+ ]
+ for region, user_id, name, score in test_data:
+ upsert_writer.upsert({
+ "region": region, "user_id": user_id,
+ "name": name, "score": score,
+ })
+ await upsert_writer.flush()
+ print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+ # Lookup all rows across partitions
+ print("\n--- Lookup across partitions ---")
+ lookuper = partitioned_kv_table.new_lookup()
+ for region, user_id, name, score in test_data:
+ result = await lookuper.lookup({"region": region, "user_id":
user_id})
+ assert result is not None, f"Expected to find region={region}
user_id={user_id}"
+ assert result["name"] == name, f"Name mismatch: {result['name']}
!= {name}"
+ assert result["score"] == score, f"Score mismatch:
{result['score']} != {score}"
+ print(f"All {len(test_data)} rows verified across partitions")
+
+ # Update within a partition
+ print("\n--- Update within partition ---")
+ handle = upsert_writer.upsert({
+ "region": "US", "user_id": 1,
+ "name": "Gustave Updated", "score": 999,
+ })
+ await handle.wait()
+ result = await lookuper.lookup({"region": "US", "user_id": 1})
Review Comment:
After the update, `result` is indexed without first verifying it is
non-`None`. If the lookup fails, this will raise a confusing `TypeError` rather
than a clear failure. Add an explicit `None` check (with a helpful message)
before accessing fields.
```suggestion
result = await lookuper.lookup({"region": "US", "user_id": 1})
assert result is not None, "Expected to find region=US user_id=1
after update"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]