Copilot commented on code in PR #64611:
URL: https://github.com/apache/airflow/pull/64611#discussion_r3025321525


##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -120,12 +140,22 @@ def get_asset_event_by_asset_alias(
     before: Annotated[UtcDateTime | None, Query(description="The end of the 
time range")] = None,
     ascending: Annotated[bool, Query(description="Whether to sort results in 
ascending order")] = True,
     limit: Annotated[int | None, Query(description="The maximum number of 
results to return")] = None,
+    extra: Annotated[
+        list[str] | None,
+        Query(
+            description="Filter by extra JSON key-value pairs. Format: 
key=value. Repeat for AND logic.",
+        ),
+    ] = None,
 ) -> AssetEventsResponse:
     where_clause = AssetAliasModel.name == name
     if after:
         where_clause = and_(where_clause, AssetEvent.timestamp >= after)
     if before:
         where_clause = and_(where_clause, AssetEvent.timestamp <= before)
+    for item in extra:
+        if "=" in item:
+            k, v = item.split("=", 1)
+            where_clause = and_(where_clause, AssetEvent.extra[k].as_string() 
== v)

Review Comment:
   `extra` defaults to `None`, but this loop iterates over it unconditionally, 
which will raise `TypeError: 'NoneType' object is not iterable` when the query 
param isn’t provided (e.g. the no-extra-filter tests). Consider reusing 
`_parse_extra_params(extra)` and applying a single `JsonContains(...)` 
predicate (or at least iterate over `extra or []`).
   ```suggestion
       extra_dict = _parse_extra_params(extra)
       if extra_dict:
           where_clause = and_(where_clause, JsonContains(AssetEvent.extra, 
extra_dict))
   ```



##########
airflow-core/src/airflow/utils/sqlalchemy.py:
##########
@@ -56,6 +58,47 @@ def get_dialect_name(session: Session) -> str | None:
     return getattr(bind.dialect, "name", None)
 
 
+class JsonContains(ColumnElement):
+    """
+    Dialect-aware JSON containment check.
+
+    Compiles to ``@>`` on PostgreSQL (GIN-indexable), ``JSON_CONTAINS`` on
+    MySQL, and per-key ``JSON_EXTRACT`` comparisons on SQLite.
+    """
+
+    inherit_cache = True
+    type = NullType()
+
+    def __init__(self, column, kv_dict: dict[str, str]):
+        self.column = column
+        self.kv_dict = kv_dict
+
+
+@compiles(JsonContains, "postgresql")
+def _pg_json_contains(element, compiler, **kw):
+    col = compiler.process(element.column, **kw)
+    val = json.dumps(element.kv_dict).replace("'", "''")
+    return f"({col})::jsonb @> '{val}'::jsonb"

Review Comment:
   `JsonContains` compiles SQL by interpolating JSON/key/value strings into the 
query text. This bypasses SQLAlchemy bind parameters and relies on manual 
escaping, which is error-prone across dialects and can enable SQL injection via 
user-controlled `extra` keys/values. Please rewrite the compilation to use 
bound parameters / SQLAlchemy expressions (e.g. Postgres 
`column.contains(bindparam(..., type_=JSONB))`, MySQL 
`func.JSON_CONTAINS(column, bindparam(...))`, SQLite `func.json_extract(column, 
bindparam(...)) == bindparam(...)`).



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -685,13 +686,18 @@ def get(
         common_params["ascending"] = ascending
         if limit:

Review Comment:
   `if limit:` will skip sending `limit=0` (and treats other falsy ints the 
same). If `0` is meant to be a valid value (e.g. explicitly request no 
results), use an explicit `if limit is not None:` check here to preserve the 
caller’s intent.
   ```suggestion
           if limit is not None:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to