Copilot commented on code in PR #64610:
URL: https://github.com/apache/airflow/pull/64610#discussion_r3025330355
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -685,6 +686,8 @@ def get(
common_params["ascending"] = ascending
if limit:
common_params["limit"] = limit
+ if partition_key:
Review Comment:
`partition_key` is only forwarded when truthy (`if partition_key:`), so an
explicitly provided empty regex pattern ("") is silently dropped and the
request becomes unfiltered. Use an explicit `is not None` check (and consider
stripping/validating if empty patterns should be rejected) so caller intent is
preserved.
```suggestion
if partition_key is not None:
```
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -102,6 +105,8 @@ def get_asset_event_by_asset_name_uri(
where_clause = and_(where_clause, AssetEvent.timestamp >= after)
if before:
where_clause = and_(where_clause, AssetEvent.timestamp <= before)
+ if partition_key:
+ where_clause = and_(where_clause,
AssetEvent.partition_key.regexp_match(partition_key))
Review Comment:
This endpoint applies `regexp_match` when `partition_key` is provided, but
regex matching is documented as unsupported on SQLite. Currently this will
likely surface as a database error/500 on SQLite deployments. Consider checking
the session dialect and returning a clear 400/501 when `partition_key` is used
against SQLite.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -102,6 +105,8 @@ def get_asset_event_by_asset_name_uri(
where_clause = and_(where_clause, AssetEvent.timestamp >= after)
if before:
where_clause = and_(where_clause, AssetEvent.timestamp <= before)
+ if partition_key:
+ where_clause = and_(where_clause,
AssetEvent.partition_key.regexp_match(partition_key))
return _get_asset_events_through_sql_clauses(
Review Comment:
Invalid regex patterns (e.g. unbalanced parentheses) will raise a database
error during query execution and likely return a 500. Consider catching
DBAPIError/StatementError around the query when `partition_key` is used and
turning it into a 400 with a helpful message indicating the pattern is invalid
for the current backend.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -328,6 +330,7 @@ def get_asset_events(
source_task_id,
source_run_id,
source_map_index,
+ partition_key_pattern,
name_pattern,
timestamp_range,
],
Review Comment:
`partition_key_pattern` is now exposed on the public REST API, but regex
matching is documented as unsupported on SQLite. Without an explicit dialect
check, SQLite deployments will likely hit a DB error/500 when this filter is
used. Consider rejecting the parameter with a clear HTTP error when the backend
is SQLite (session is available here).
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_events.py:
##########
@@ -521,3 +521,199 @@ def test_get_by_asset(self, client):
},
]
}
+
+
+class TestGetAssetEventByAssetPartitionKey:
+ """Tests for partition_key regex filter on execution API.
+
+ regexp_match is not supported on SQLite; run with --backend postgres.
+ """
+
+ @pytest.fixture
+ def test_partitioned_events(self, session, test_asset):
+ def make_timestamp(day):
+ return datetime(2021, 1, day, tzinfo=timezone.utc)
+
+ common = {
+ "asset_id": 1,
+ "extra": {"foo": "bar"},
+ "source_dag_id": "foo",
+ "source_task_id": "bar",
+ "source_run_id": "custom",
+ "source_map_index": -1,
+ }
+
+ events = [
+ AssetEvent(id=10, timestamp=make_timestamp(1),
partition_key="2024-01-01", **common),
+ AssetEvent(id=11, timestamp=make_timestamp(2),
partition_key="2024-01-02", **common),
+ AssetEvent(id=12, timestamp=make_timestamp(3),
partition_key="us|2024-01-01", **common),
+ AssetEvent(id=13, timestamp=make_timestamp(4),
partition_key="eu|2024-01-01", **common),
+ AssetEvent(id=14, timestamp=make_timestamp(5),
partition_key="apac|2024-03-20", **common),
+ AssetEvent(id=15, timestamp=make_timestamp(6), partition_key=None,
**common),
+ ]
+ session.add_all(events)
+ session.commit()
+ yield events
+
+ for event in events:
+ session.delete(event)
+ session.commit()
+
+ @pytest.mark.backend("postgres")
+ @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+ def test_get_by_asset_with_exact_partition_key(self, client):
+ response = client.get(
+ "/execution/asset-events/by-asset",
+ params={"name": "test_get_asset_by_name", "partition_key":
"^2024-01-01$"},
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert len(data["asset_events"]) == 1
+ assert data["asset_events"][0]["partition_key"] == "2024-01-01"
+
+ @pytest.mark.backend("postgres")
+ @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+ def test_get_by_asset_with_prefix_partition_key(self, client):
+ response = client.get(
+ "/execution/asset-events/by-asset",
+ params={"name": "test_get_asset_by_name", "partition_key":
"^2024-01-"},
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert len(data["asset_events"]) == 2
+
+ @pytest.mark.backend("postgres")
+ @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+ def test_get_by_asset_with_composite_partition_key_multiple_regions(self,
client):
+ response = client.get(
+ "/execution/asset-events/by-asset",
+ params={
+ "name": "test_get_asset_by_name",
+ "partition_key": r"^(us|eu)\|2024-01-.*",
+ },
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert len(data["asset_events"]) == 2
+ keys = {e["partition_key"] for e in data["asset_events"]}
+ assert keys == {"us|2024-01-01", "eu|2024-01-01"}
+
+ @pytest.mark.backend("postgres")
+ @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+ def
test_get_by_asset_with_composite_partition_key_date_across_regions(self,
client):
+ response = client.get(
+ "/execution/asset-events/by-asset",
+ params={"name": "test_get_asset_by_name", "partition_key":
r".*\|2024-01-01$"},
Review Comment:
The new execution API behavior claims MySQL support (via SQLAlchemy's
`regexp_match`), but the added tests are Postgres-only. Since regex
semantics/operators differ by backend, it would be good to add at least one
`@pytest.mark.backend("mysql")` test (or explicitly document/guard if MySQL
support is not intended here).
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -82,6 +82,9 @@ def get_asset_event_by_asset_name_uri(
before: Annotated[UtcDateTime | None, Query(description="The end of the
time range")] = None,
ascending: Annotated[bool, Query(description="Whether to sort results in
ascending order")] = True,
limit: Annotated[int | None, Query(description="The maximum number of
results to return")] = None,
+ partition_key: Annotated[
+ str | None, Query(description="Regex pattern to filter by partition
key")
+ ] = None,
) -> AssetEventsResponse:
if name and uri:
where_clause = and_(AssetModel.name == name, AssetModel.uri == uri)
Review Comment:
The query parameter is named `partition_key`, but the description and
implementation treat it as a regex pattern. This is easy to misinterpret as an
exact-match filter and is inconsistent with the public REST API’s
`partition_key_pattern`. Consider renaming to `partition_key_pattern` (or
updating docs/description very explicitly) before this becomes a stable API
surface.
##########
airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml:
##########
@@ -13494,11 +13506,6 @@ components:
type:
type: string
title: Error Type
- input:
- title: Input
- ctx:
- type: object
- title: Context
type: object
Review Comment:
This OpenAPI schema change removes the optional `input` and `ctx` fields
from `ValidationError`. That’s a potentially breaking change for generated
clients and is not mentioned in the PR description (which focuses on asset
event partition key filtering). Please confirm this is intentional and, if not,
regenerate/adjust the schema generation to avoid unrelated diffs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]