Copilot commented on code in PR #64610:
URL: https://github.com/apache/airflow/pull/64610#discussion_r3025330355


##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -685,6 +686,8 @@ def get(
         common_params["ascending"] = ascending
         if limit:
             common_params["limit"] = limit
+        if partition_key:

Review Comment:
   `partition_key` is only forwarded when truthy (`if partition_key:`), so an 
explicitly provided empty regex pattern ("") is silently dropped and the 
request becomes unfiltered. Use an explicit `is not None` check (and consider 
stripping/validating if empty patterns should be rejected) so caller intent is 
preserved.
   ```suggestion
           if partition_key is not None:
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -102,6 +105,8 @@ def get_asset_event_by_asset_name_uri(
         where_clause = and_(where_clause, AssetEvent.timestamp >= after)
     if before:
         where_clause = and_(where_clause, AssetEvent.timestamp <= before)
+    if partition_key:
+        where_clause = and_(where_clause, 
AssetEvent.partition_key.regexp_match(partition_key))

Review Comment:
   This endpoint applies `regexp_match` when `partition_key` is provided, but 
regex matching is documented as unsupported on SQLite. Currently this will 
likely surface as a database error/500 on SQLite deployments. Consider checking 
the session dialect and returning a clear 400/501 when `partition_key` is used 
against SQLite.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -102,6 +105,8 @@ def get_asset_event_by_asset_name_uri(
         where_clause = and_(where_clause, AssetEvent.timestamp >= after)
     if before:
         where_clause = and_(where_clause, AssetEvent.timestamp <= before)
+    if partition_key:
+        where_clause = and_(where_clause, 
AssetEvent.partition_key.regexp_match(partition_key))
 
     return _get_asset_events_through_sql_clauses(

Review Comment:
   Invalid regex patterns (e.g. unbalanced parentheses) will raise a database 
error during query execution and likely return a 500. Consider catching 
DBAPIError/StatementError around the query when `partition_key` is used and 
turning it into a 400 with a helpful message indicating the pattern is invalid 
for the current backend.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -328,6 +330,7 @@ def get_asset_events(
             source_task_id,
             source_run_id,
             source_map_index,
+            partition_key_pattern,
             name_pattern,
             timestamp_range,
         ],

Review Comment:
   `partition_key_pattern` is now exposed on the public REST API, but regex 
matching is documented as unsupported on SQLite. Without an explicit dialect 
check, SQLite deployments will likely hit a DB error/500 when this filter is 
used. Consider rejecting the parameter with a clear HTTP error when the backend 
is SQLite (session is available here).



##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_events.py:
##########
@@ -521,3 +521,199 @@ def test_get_by_asset(self, client):
                 },
             ]
         }
+
+
+class TestGetAssetEventByAssetPartitionKey:
+    """Tests for partition_key regex filter on execution API.
+
+    regexp_match is not supported on SQLite; run with --backend postgres.
+    """
+
+    @pytest.fixture
+    def test_partitioned_events(self, session, test_asset):
+        def make_timestamp(day):
+            return datetime(2021, 1, day, tzinfo=timezone.utc)
+
+        common = {
+            "asset_id": 1,
+            "extra": {"foo": "bar"},
+            "source_dag_id": "foo",
+            "source_task_id": "bar",
+            "source_run_id": "custom",
+            "source_map_index": -1,
+        }
+
+        events = [
+            AssetEvent(id=10, timestamp=make_timestamp(1), 
partition_key="2024-01-01", **common),
+            AssetEvent(id=11, timestamp=make_timestamp(2), 
partition_key="2024-01-02", **common),
+            AssetEvent(id=12, timestamp=make_timestamp(3), 
partition_key="us|2024-01-01", **common),
+            AssetEvent(id=13, timestamp=make_timestamp(4), 
partition_key="eu|2024-01-01", **common),
+            AssetEvent(id=14, timestamp=make_timestamp(5), 
partition_key="apac|2024-03-20", **common),
+            AssetEvent(id=15, timestamp=make_timestamp(6), partition_key=None, 
**common),
+        ]
+        session.add_all(events)
+        session.commit()
+        yield events
+
+        for event in events:
+            session.delete(event)
+        session.commit()
+
+    @pytest.mark.backend("postgres")
+    @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+    def test_get_by_asset_with_exact_partition_key(self, client):
+        response = client.get(
+            "/execution/asset-events/by-asset",
+            params={"name": "test_get_asset_by_name", "partition_key": 
"^2024-01-01$"},
+        )
+        assert response.status_code == 200
+        data = response.json()
+        assert len(data["asset_events"]) == 1
+        assert data["asset_events"][0]["partition_key"] == "2024-01-01"
+
+    @pytest.mark.backend("postgres")
+    @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+    def test_get_by_asset_with_prefix_partition_key(self, client):
+        response = client.get(
+            "/execution/asset-events/by-asset",
+            params={"name": "test_get_asset_by_name", "partition_key": 
"^2024-01-"},
+        )
+        assert response.status_code == 200
+        data = response.json()
+        assert len(data["asset_events"]) == 2
+
+    @pytest.mark.backend("postgres")
+    @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+    def test_get_by_asset_with_composite_partition_key_multiple_regions(self, 
client):
+        response = client.get(
+            "/execution/asset-events/by-asset",
+            params={
+                "name": "test_get_asset_by_name",
+                "partition_key": r"^(us|eu)\|2024-01-.*",
+            },
+        )
+        assert response.status_code == 200
+        data = response.json()
+        assert len(data["asset_events"]) == 2
+        keys = {e["partition_key"] for e in data["asset_events"]}
+        assert keys == {"us|2024-01-01", "eu|2024-01-01"}
+
+    @pytest.mark.backend("postgres")
+    @pytest.mark.usefixtures("test_asset", "test_partitioned_events")
+    def 
test_get_by_asset_with_composite_partition_key_date_across_regions(self, 
client):
+        response = client.get(
+            "/execution/asset-events/by-asset",
+            params={"name": "test_get_asset_by_name", "partition_key": 
r".*\|2024-01-01$"},

Review Comment:
   The new execution API behavior claims MySQL support (via SQLAlchemy's 
`regexp_match`), but the added tests are Postgres-only. Since regex 
semantics/operators differ by backend, it would be good to add at least one 
`@pytest.mark.backend("mysql")` test (or explicitly document/guard if MySQL 
support is not intended here).



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -82,6 +82,9 @@ def get_asset_event_by_asset_name_uri(
     before: Annotated[UtcDateTime | None, Query(description="The end of the 
time range")] = None,
     ascending: Annotated[bool, Query(description="Whether to sort results in 
ascending order")] = True,
     limit: Annotated[int | None, Query(description="The maximum number of 
results to return")] = None,
+    partition_key: Annotated[
+        str | None, Query(description="Regex pattern to filter by partition 
key")
+    ] = None,
 ) -> AssetEventsResponse:
     if name and uri:
         where_clause = and_(AssetModel.name == name, AssetModel.uri == uri)

Review Comment:
   The query parameter is named `partition_key`, but the description and 
implementation treat it as a regex pattern. This is easy to misinterpret as an 
exact-match filter and is inconsistent with the public REST API’s 
`partition_key_pattern`. Consider renaming to `partition_key_pattern` (or 
updating docs/description very explicitly) before this becomes a stable API 
surface.



##########
airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml:
##########
@@ -13494,11 +13506,6 @@ components:
         type:
           type: string
           title: Error Type
-        input:
-          title: Input
-        ctx:
-          type: object
-          title: Context
       type: object

Review Comment:
   This OpenAPI schema change removes the optional `input` and `ctx` fields 
from `ValidationError`. That’s a potentially breaking change for generated 
clients and is not mentioned in the PR description (which focuses on asset 
event partition key filtering). Please confirm this is intentional and, if not, 
regenerate/adjust the schema generation to avoid unrelated diffs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to