charlesdong1991 commented on code in PR #540:
URL: https://github.com/apache/fluss-rust/pull/540#discussion_r3214871985
##########
bindings/python/test/test_kv_table.py:
##########
@@ -431,3 +460,77 @@ async def test_all_supported_datatypes(connection, admin):
assert result[col] is None, f"{col} should be null"
await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_prefix_lookup_validation_errors(connection, admin):
+ """Test that prefix lookup raises errors for invalid column
configurations."""
+ table_path = fluss.TablePath("fluss", "py_test_prefix_lookup_validation")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("a", pa.int32()),
+ pa.field("b", pa.string()),
+ pa.field("c", pa.int64()),
+ ]
+ ),
+ primary_keys=["a", "b", "c"],
+ )
+ table_descriptor = fluss.TableDescriptor(
+ schema, bucket_count=3, bucket_keys=["a", "b"]
+ )
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+
+ # lookup_by with columns equal to full PK should error
+ with pytest.raises(fluss.FlussError, match="prefix lookup"):
+ table.new_lookup().lookup_by(["a", "b", "c"]).create_lookuper()
+
+ # lookup_by with wrong column names should error
+ with pytest.raises(fluss.FlussError, match="bucket keys"):
+ table.new_lookup().lookup_by(["a", "c"]).create_lookuper()
+
+ # lookup_by with unknown column should error
+ with pytest.raises(fluss.FlussError, match="Unknown column name"):
+ table.new_lookup().lookup_by(["a", "missing_col"]).create_lookuper()
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+ # Partitioned table: lookup columns must include partition keys first,
+ # followed by bucket keys.
+ partitioned_table_path = fluss.TablePath("fluss",
"py_test_prefix_lookup_validation_pt")
+ await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True)
+
+ partitioned_schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("region", pa.string()),
+ pa.field("user_id", pa.int32()),
+ pa.field("event_id", pa.int64()),
+ ]
+ ),
+ primary_keys=["region", "user_id", "event_id"],
+ )
+ partitioned_table_descriptor = fluss.TableDescriptor(
+ partitioned_schema,
+ partition_keys=["region"],
+ bucket_count=3,
+ bucket_keys=["user_id"],
+ )
+ await admin.create_table(
+ partitioned_table_path, partitioned_table_descriptor,
ignore_if_exists=False
+ )
+
+ partitioned_table = await connection.get_table(partitioned_table_path)
+
+ # Missing partition key in lookup columns.
+ with pytest.raises(fluss.FlussError, match="partition fields"):
+ partitioned_table.new_lookup().lookup_by(["user_id"]).create_lookuper()
+
+ # After partition keys, remaining columns must equal bucket keys.
+ with pytest.raises(fluss.FlussError, match="bucket keys"):
+ partitioned_table.new_lookup().lookup_by(["region",
"event_id"]).create_lookuper()
Review Comment:
nice one, added!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]