fresh-borzoni commented on code in PR #540:
URL: https://github.com/apache/fluss-rust/pull/540#discussion_r3214816659
##########
bindings/python/test/test_kv_table.py:
##########
@@ -431,3 +460,77 @@ async def test_all_supported_datatypes(connection, admin):
assert result[col] is None, f"{col} should be null"
await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_prefix_lookup_validation_errors(connection, admin):
+ """Test that prefix lookup raises errors for invalid column
configurations."""
+ table_path = fluss.TablePath("fluss", "py_test_prefix_lookup_validation")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("a", pa.int32()),
+ pa.field("b", pa.string()),
+ pa.field("c", pa.int64()),
+ ]
+ ),
+ primary_keys=["a", "b", "c"],
+ )
+ table_descriptor = fluss.TableDescriptor(
+ schema, bucket_count=3, bucket_keys=["a", "b"]
+ )
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+
+ # lookup_by with columns equal to full PK should error
+ with pytest.raises(fluss.FlussError, match="prefix lookup"):
+ table.new_lookup().lookup_by(["a", "b", "c"]).create_lookuper()
+
+ # lookup_by with wrong column names should error
+ with pytest.raises(fluss.FlussError, match="bucket keys"):
+ table.new_lookup().lookup_by(["a", "c"]).create_lookuper()
+
+ # lookup_by with unknown column should error
+ with pytest.raises(fluss.FlussError, match="Unknown column name"):
+ table.new_lookup().lookup_by(["a", "missing_col"]).create_lookuper()
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+ # Partitioned table: lookup columns must include partition keys first,
+ # followed by bucket keys.
+ partitioned_table_path = fluss.TablePath("fluss",
"py_test_prefix_lookup_validation_pt")
+ await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True)
+
+ partitioned_schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("region", pa.string()),
+ pa.field("user_id", pa.int32()),
+ pa.field("event_id", pa.int64()),
+ ]
+ ),
+ primary_keys=["region", "user_id", "event_id"],
+ )
+ partitioned_table_descriptor = fluss.TableDescriptor(
+ partitioned_schema,
+ partition_keys=["region"],
+ bucket_count=3,
+ bucket_keys=["user_id"],
+ )
+ await admin.create_table(
+ partitioned_table_path, partitioned_table_descriptor,
ignore_if_exists=False
+ )
+
+ partitioned_table = await connection.get_table(partitioned_table_path)
+
+ # Missing partition key in lookup columns.
+ with pytest.raises(fluss.FlussError, match="partition fields"):
+ partitioned_table.new_lookup().lookup_by(["user_id"]).create_lookuper()
+
+ # After partition keys, remaining columns must equal bucket keys.
+ with pytest.raises(fluss.FlussError, match="bucket keys"):
+ partitioned_table.new_lookup().lookup_by(["region",
"event_id"]).create_lookuper()
Review Comment:
nit: Let's add a happy-path assertion against a non-existent partition too -
core returns empty now, so let's match.
##########
bindings/python/test/test_kv_table.py:
##########
@@ -109,47 +110,75 @@ async def test_composite_primary_keys(connection, admin):
pa.field("region", pa.string()),
pa.field("score", pa.int64()),
pa.field("user_id", pa.int32()),
+ pa.field("event_id", pa.int64()),
]
),
- primary_keys=["region", "user_id"],
+ primary_keys=["region", "user_id", "event_id"],
+ )
+ table_descriptor = fluss.TableDescriptor(
+ schema, bucket_count=3, bucket_keys=["region", "user_id"]
)
- table_descriptor = fluss.TableDescriptor(schema)
await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
table = await connection.get_table(table_path)
upsert_writer = table.new_upsert().create_writer()
test_data = [
- ("US", 1, 100),
- ("US", 2, 200),
- ("EU", 1, 150),
- ("EU", 2, 250),
+ ("US", 1, 1, 100),
+ ("US", 1, 2, 200),
+ ("US", 2, 1, 300),
+ ("EU", 1, 1, 150),
+ ("EU", 2, 1, 250),
]
- for region, user_id, score in test_data:
- upsert_writer.upsert({"region": region, "user_id": user_id, "score":
score})
+ for region, user_id, event_id, score in test_data:
+ upsert_writer.upsert(
+ {
+ "region": region,
+ "user_id": user_id,
+ "event_id": event_id,
+ "score": score,
+ }
+ )
await upsert_writer.flush()
lookuper = table.new_lookup().create_lookuper()
- # Lookup (US, 1) -> score 100
- result = await lookuper.lookup({"region": "US", "user_id": 1})
+ # Lookup (US, 1, 1) -> score 100
+ result = await lookuper.lookup({"region": "US", "user_id": 1, "event_id":
1})
assert result is not None
assert result["score"] == 100
- # Lookup (EU, 2) -> score 250
- result = await lookuper.lookup({"region": "EU", "user_id": 2})
+ # Lookup (EU, 2, 1) -> score 250
+ result = await lookuper.lookup({"region": "EU", "user_id": 2, "event_id":
1})
assert result is not None
assert result["score"] == 250
- # Update (US, 1) score (await acknowledgment)
- handle = upsert_writer.upsert({"region": "US", "user_id": 1, "score": 500})
+ # Update (US, 1, 1) score (await acknowledgment)
+ handle = upsert_writer.upsert(
+ {"region": "US", "user_id": 1, "event_id": 1, "score": 500}
+ )
await handle.wait()
- result = await lookuper.lookup({"region": "US", "user_id": 1})
+ result = await lookuper.lookup({"region": "US", "user_id": 1, "event_id":
1})
assert result is not None
assert result["score"] == 500
+ prefix_lookuper = table.new_lookup().lookup_by(["region",
"user_id"]).create_lookuper()
+
+ # Prefix (US, 1) should match 2 rows (event_id 1 and 2)
+ rows = await prefix_lookuper.lookup({"region": "US", "user_id": 1})
+ assert len(rows) == 2
+ event_ids = sorted(row["event_id"] for row in rows)
+ assert event_ids == [1, 2]
+
+ # Also validate list/tuple prefix input
+ rows = await prefix_lookuper.lookup(["US", 1])
+ assert len(rows) == 2
+ rows = await prefix_lookuper.lookup(("EU", 2))
+ assert len(rows) == 1
+ assert rows[0]["event_id"] == 1
+
Review Comment:
nit: Could we add a quick assertion for the empty-result case - a prefix
that matches zero rows should return []?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]