Copilot commented on code in PR #64611:
URL: https://github.com/apache/airflow/pull/64611#discussion_r3025291586


##########
airflow-core/src/airflow/utils/sqlalchemy.py:
##########
@@ -56,6 +58,47 @@ def get_dialect_name(session: Session) -> str | None:
     return getattr(bind.dialect, "name", None)
 
 
+class JsonContains(ColumnElement):
+    """
+    Dialect-aware JSON containment check.
+
+    Compiles to ``@>`` on PostgreSQL (GIN-indexable), ``JSON_CONTAINS`` on
+    MySQL, and per-key ``JSON_EXTRACT`` comparisons on SQLite.
+    """
+
+    inherit_cache = True
+    type = NullType()
+
+    def __init__(self, column, kv_dict: dict[str, str]):
+        self.column = column
+        self.kv_dict = kv_dict
+
+
+@compiles(JsonContains, "postgresql")
+def _pg_json_contains(element, compiler, **kw):
+    col = compiler.process(element.column, **kw)
+    val = json.dumps(element.kv_dict).replace("'", "''")
+    return f"({col})::jsonb @> '{val}'::jsonb"
+
+
+@compiles(JsonContains, "mysql")
+def _mysql_json_contains(element, compiler, **kw):
+    col = compiler.process(element.column, **kw)
+    val = json.dumps(element.kv_dict).replace("'", "''")
+    return f"JSON_CONTAINS({col}, '{val}') = 1"
+
+
+@compiles(JsonContains)
+def _default_json_contains(element, compiler, **kw):
+    col = compiler.process(element.column, **kw)
+    clauses = []
+    for k, v in element.kv_dict.items():
+        safe_k = k.replace("'", "''")
+        safe_v = v.replace("'", "''")
+        clauses.append(f"JSON_EXTRACT({col}, '$.{safe_k}') = '{safe_v}'")
+    return "(" + " AND ".join(clauses) + ")"

Review Comment:
   The `@compiles` implementations build SQL by interpolating JSON and 
user-provided keys/values into string literals (e.g. `... @> '{val}'::jsonb`, 
`JSON_EXTRACT(... '$.{safe_k}') = '{safe_v}'`). Escaping only single quotes is 
not sufficient across dialects (e.g. MySQL string literals treat backslash as 
an escape), and this pattern bypasses bound parameters, increasing risk of 
malformed SQL and injection. Prefer SQLAlchemy expression APIs with bind 
parameters (e.g. Postgres JSONB containment via 
`.cast(JSONB).contains(<bindparam JSON>)`, MySQL `func.JSON_CONTAINS(col, 
<bindparam>)`, SQLite `func.json_extract(col, <bindparam path>) == <bindparam 
value>`).



##########
airflow-core/src/airflow/migrations/versions/0111_3_3_0_add_gin_index_on_asset_event_extra.py:
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add GIN index on asset_event.extra for PostgreSQL.
+
+Revision ID: 5a5d3253e946
+Revises: a4c2d171ae18
+Create Date: 2026-04-01 23:00:00.000000
+
+"""
+
+from __future__ import annotations
+
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = "5a5d3253e946"
+down_revision = "a4c2d171ae18"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+    """Add GIN index on asset_event.extra for PostgreSQL only."""
+    conn = op.get_bind()
+    if conn.dialect.name == "postgresql":
+        op.execute(
+            text(
+                "CREATE INDEX CONCURRENTLY IF NOT EXISTS 
idx_asset_event_extra_gin "
+                "ON asset_event USING GIN (extra jsonb_ops)"
+            )
+        )
+
+
+def downgrade():
+    """Remove GIN index on asset_event.extra."""
+    conn = op.get_bind()
+    if conn.dialect.name == "postgresql":
+        op.execute(text("DROP INDEX CONCURRENTLY IF EXISTS 
idx_asset_event_extra_gin"))

Review Comment:
   This migration uses `CREATE INDEX CONCURRENTLY` / `DROP INDEX CONCURRENTLY`, 
but Airflow’s Alembic env runs each migration inside a transaction 
(`transaction_per_migration=True`, see `airflow/migrations/env.py:121-132`). 
Postgres will error because `CONCURRENTLY` cannot run inside a transaction 
block. Use an autocommit block (Alembic `op.get_context().autocommit_block()`), 
or avoid `CONCURRENTLY` and rely on standard `op.create_index` (or Alembic’s 
`postgresql_concurrently=True` if available in this repo’s Alembic version).
   ```suggestion
           context = op.get_context()
           with context.autocommit_block():
               op.execute(
                   text(
                       "CREATE INDEX CONCURRENTLY IF NOT EXISTS 
idx_asset_event_extra_gin "
                       "ON asset_event USING GIN (extra jsonb_ops)"
                   )
               )
   
   
   def downgrade():
       """Remove GIN index on asset_event.extra."""
       conn = op.get_bind()
       if conn.dialect.name == "postgresql":
           context = op.get_context()
           with context.autocommit_block():
               op.execute(
                   text("DROP INDEX CONCURRENTLY IF EXISTS 
idx_asset_event_extra_gin")
               )
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -120,12 +140,22 @@ def get_asset_event_by_asset_alias(
     before: Annotated[UtcDateTime | None, Query(description="The end of the 
time range")] = None,
     ascending: Annotated[bool, Query(description="Whether to sort results in 
ascending order")] = True,
     limit: Annotated[int | None, Query(description="The maximum number of 
results to return")] = None,
+    extra: Annotated[
+        list[str] | None,
+        Query(
+            description="Filter by extra JSON key-value pairs. Format: 
key=value. Repeat for AND logic.",
+        ),
+    ] = None,
 ) -> AssetEventsResponse:
     where_clause = AssetAliasModel.name == name
     if after:
         where_clause = and_(where_clause, AssetEvent.timestamp >= after)
     if before:
         where_clause = and_(where_clause, AssetEvent.timestamp <= before)
+    for item in extra:

Review Comment:
   `extra` defaults to `None`, but this function iterates `for item in extra:` 
which will raise a `TypeError` when the query param is omitted. Consider making 
`extra` a `list[str]` with `Query(default_factory=list, ...)` (so it’s always 
iterable) or iterate over `(extra or [])` before splitting.
   ```suggestion
       for item in extra or []:
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml:
##########
@@ -13494,11 +13506,6 @@ components:
         type:
           type: string
           title: Error Type
-        input:
-          title: Input
-        ctx:
-          type: object
-          title: Context
       type: object
       required:
       - loc

Review Comment:
   The generated OpenAPI schema removes `ValidationError.input` and 
`ValidationError.ctx` from the components model. This is not mentioned in the 
PR description and could be a breaking change for any clients relying on those 
fields. Please confirm this change is intentional (e.g. due to a 
dependency/generator update) and, if not, regenerate/adjust the spec to avoid 
unrelated contract changes in this PR.



##########
airflow-core/src/airflow/utils/sqlalchemy.py:
##########
@@ -56,6 +58,47 @@ def get_dialect_name(session: Session) -> str | None:
     return getattr(bind.dialect, "name", None)
 
 
+class JsonContains(ColumnElement):
+    """
+    Dialect-aware JSON containment check.
+
+    Compiles to ``@>`` on PostgreSQL (GIN-indexable), ``JSON_CONTAINS`` on
+    MySQL, and per-key ``JSON_EXTRACT`` comparisons on SQLite.
+    """
+
+    inherit_cache = True
+    type = NullType()
+
+    def __init__(self, column, kv_dict: dict[str, str]):
+        self.column = column
+        self.kv_dict = kv_dict
+

Review Comment:
   `JsonContains` sets `inherit_cache = True` but doesn’t define any 
traversal/caching metadata for its state (`column`, `kv_dict`). In SQLAlchemy 
this can lead to incorrect SQL compilation caching (different `kv_dict` values 
reusing a cached statement) or cache-key warnings/errors. Either define 
appropriate traversal internals (so the cache key includes `column` and 
`kv_dict`) or set `inherit_cache = False`.



##########
airflow-core/src/airflow/api_fastapi/common/parameters.py:
##########
@@ -267,6 +269,56 @@ def depends_search(
     return depends_search
 
 
+class _JsonKVFilter(BaseParam[dict[str, str]]):
+    """
+    Filter on a JSON column by multiple key-value pairs (AND logic).
+
+    Uses dialect-aware SQL: ``@>`` (JSONB containment, GIN-indexable) on
+    PostgreSQL, ``JSON_CONTAINS`` on MySQL, and ``JSON_EXTRACT`` on SQLite.
+    """
+
+    def __init__(
+        self,
+        attribute: ColumnElement,
+        value: dict[str, str] | None = None,
+        skip_none: bool = True,
+    ) -> None:
+        super().__init__(skip_none=skip_none)
+        self.attribute: ColumnElement = attribute
+        self.value = value
+
+    def to_orm(self, select: Select) -> Select:
+        if not self.value:
+            return select
+        return select.where(JsonContains(self.attribute, self.value))
+
+    @classmethod
+    def depends(cls, *args: Any, **kwargs: Any) -> Self:
+        raise NotImplementedError("Use json_kv_filter_factory instead.")
+
+
+def json_kv_filter_factory(
+    attribute: ColumnElement,
+    param_name: str = "extra",
+) -> Callable[[list[str]], _JsonKVFilter]:
+    DESCRIPTION = (
+        "Filter by JSON key-value pairs. Repeat for multiple conditions (AND 
logic). "
+        "Format: key=value (e.g. extra=region=us&extra=env=prod)."
+    )
+
+    def depends_json_kv(
+        values: list[str] = Query(alias=param_name, default_factory=list, 
description=DESCRIPTION),
+    ) -> _JsonKVFilter:
+        kv_dict: dict[str, str] = {}
+        for item in values:
+            if "=" in item:
+                k, v = item.split("=", 1)
+                kv_dict[k] = v

Review Comment:
   `json_kv_filter_factory` silently ignores malformed `extra` entries that 
don’t contain `=`. Given the API description says the format is `key=value`, 
passing an invalid value currently results in the filter being dropped 
(potentially returning more rows than intended) with no client feedback. 
Consider validating each item and raising an HTTP 422 (or 400) when an entry is 
malformed, to match other parameter transformers in this module that error on 
invalid input.
   ```suggestion
               if "=" not in item:
                   raise HTTPException(
                       status_code=HTTP_422_UNPROCESSABLE_CONTENT,
                       detail=f"Invalid value for parameter '{param_name}': 
{item!r}. Expected format 'key=value'.",
                   )
               k, v = item.split("=", 1)
               kv_dict[k] = v
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py:
##########
@@ -73,6 +74,16 @@ def _get_asset_events_through_sql_clauses(
     )
 
 
+def _parse_extra_params(extra: list[str] | None) -> dict[str, str]:
+    """Parse repeated ``key=value`` query params into a dict."""
+    result: dict[str, str] = {}
+    for item in extra or []:
+        if "=" in item:
+            k, v = item.split("=", 1)
+            result[k] = v

Review Comment:
   `_parse_extra_params()` silently drops any `extra` values that don’t contain 
`=`. Since the endpoint advertises `key=value` format, consider validating and 
returning a 422/400 on malformed entries instead of ignoring them (otherwise a 
typo like `extra=region` results in the filter being skipped and broader 
results returned).
   ```suggestion
           if "=" not in item:
               raise HTTPException(
                   status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                   detail={
                       "reason": "Invalid parameter",
                       "message": f"Invalid extra parameter format: {item!r}. 
Expected 'key=value'.",
                   },
               )
           k, v = item.split("=", 1)
           result[k] = v
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to