Copilot commented on code in PR #64611:
URL: https://github.com/apache/airflow/pull/64611#discussion_r3025321525
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -120,12 +140,22 @@ def get_asset_event_by_asset_alias(
before: Annotated[UtcDateTime | None, Query(description="The end of the
time range")] = None,
ascending: Annotated[bool, Query(description="Whether to sort results in
ascending order")] = True,
limit: Annotated[int | None, Query(description="The maximum number of
results to return")] = None,
+ extra: Annotated[
+ list[str] | None,
+ Query(
+ description="Filter by extra JSON key-value pairs. Format:
key=value. Repeat for AND logic.",
+ ),
+ ] = None,
) -> AssetEventsResponse:
where_clause = AssetAliasModel.name == name
if after:
where_clause = and_(where_clause, AssetEvent.timestamp >= after)
if before:
where_clause = and_(where_clause, AssetEvent.timestamp <= before)
+ for item in extra:
+ if "=" in item:
+ k, v = item.split("=", 1)
+ where_clause = and_(where_clause, AssetEvent.extra[k].as_string()
== v)
Review Comment:
`extra` defaults to `None`, but this loop iterates over it unconditionally,
which will raise `TypeError: 'NoneType' object is not iterable` when the query
param isn’t provided (e.g. the no-extra-filter tests). Consider reusing
`_parse_extra_params(extra)` and applying a single `JsonContains(...)`
predicate (or at least iterate over `extra or []`).
```suggestion
extra_dict = _parse_extra_params(extra)
if extra_dict:
where_clause = and_(where_clause, JsonContains(AssetEvent.extra,
extra_dict))
```
##########
airflow-core/src/airflow/utils/sqlalchemy.py:
##########
@@ -56,6 +58,47 @@ def get_dialect_name(session: Session) -> str | None:
return getattr(bind.dialect, "name", None)
+class JsonContains(ColumnElement):
+ """
+ Dialect-aware JSON containment check.
+
+ Compiles to ``@>`` on PostgreSQL (GIN-indexable), ``JSON_CONTAINS`` on
+ MySQL, and per-key ``JSON_EXTRACT`` comparisons on SQLite.
+ """
+
+ inherit_cache = True
+ type = NullType()
+
+ def __init__(self, column, kv_dict: dict[str, str]):
+ self.column = column
+ self.kv_dict = kv_dict
+
+
+@compiles(JsonContains, "postgresql")
+def _pg_json_contains(element, compiler, **kw):
+ col = compiler.process(element.column, **kw)
+ val = json.dumps(element.kv_dict).replace("'", "''")
+ return f"({col})::jsonb @> '{val}'::jsonb"
Review Comment:
`JsonContains` compiles SQL by interpolating JSON/key/value strings into the
query text. This bypasses SQLAlchemy bind parameters and relies on manual
escaping, which is error-prone across dialects and can enable SQL injection via
user-controlled `extra` keys/values. Please rewrite the compilation to use
bound parameters / SQLAlchemy expressions (e.g. Postgres
`column.contains(bindparam(..., type_=JSONB))`, MySQL
`func.JSON_CONTAINS(column, bindparam(...))`, SQLite `func.json_extract(column,
bindparam(...)) == bindparam(...)`).
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -685,13 +686,18 @@ def get(
common_params["ascending"] = ascending
if limit:
Review Comment:
`if limit:` will skip sending `limit=0` (and treats other falsy ints the
same). If `0` is meant to be a valid value (e.g. explicitly request no
results), use an explicit `if limit is not None:` check here to preserve the
caller’s intent.
```suggestion
if limit is not None:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]