Copilot commented on code in PR #62963:
URL: https://github.com/apache/airflow/pull/62963#discussion_r3032145799


##########
providers/common/ai/src/airflow/providers/common/ai/utils/dq_validation.py:
##########
@@ -0,0 +1,475 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Built-in and custom validator factories for 
:class:`~airflow.providers.common.ai.operators.llm_data_quality.LLMDataQualityOperator`.
+
+Each factory returns a ``Callable[[Any], bool]`` that can be placed directly 
in the
+``validators`` dict alongside plain lambdas.  They are intentionally decoupled 
from
+the operator so they can be tested and composed independently.
+
+Custom validators can be registered with :func:`register_validator` so that the
+operator automatically injects their ``llm_context`` into the LLM system 
prompt,
+guiding SQL generation for custom metric types.
+
+Usage::
+
+    from airflow.providers.common.ai.utils.dq_validation import (
+        null_pct_check,
+        row_count_check,
+        duplicate_pct_check,
+        between_check,
+        exact_check,
+        register_validator,
+    )
+
+    # Built-in validators
+    validators = {
+        "email_nulls": null_pct_check(max_pct=0.05),
+        "min_customers": row_count_check(min_count=1000),
+        "dup_emails": duplicate_pct_check(max_pct=0.01),
+        "amount_range": between_check(min_val=0.0, max_val=1_000_000.0),
+        "active_flag": exact_check(expected=0),
+        "negative_rev": lambda v: v == 0,
+    }
+
+
+    # Custom validator with LLM context
+    @register_validator(
+        "freshness_check",
+        llm_context=(
+            "Compute hours since the most recent row. "
+            "SQL pattern: EXTRACT(EPOCH FROM (NOW() - MAX(ts_col))) / 3600.0. "
+            "Returns a DOUBLE representing hours elapsed."
+        ),
+        check_category="freshness",
+    )
+    def freshness_check(*, max_hours: float):
+        def _check(value):
+            return float(value) <= max_hours
+
+        return _check
+"""
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from dataclasses import dataclass
+from typing import Any
+
+# ------------------------------------------------------------------
+# Validator registry — allows custom validators with LLM context
+# ------------------------------------------------------------------
+
+
+@dataclass(frozen=True)
+class ValidatorEntry:
+    """
+    Metadata for a registered validator factory.
+
+    :param factory: Callable that returns a ``Callable[[Any], bool]`` 
validator.
+    :param llm_context: Optional hint injected into the LLM system prompt so
+        the model knows what SQL metric format this validator expects.
+    :param check_category: Optional custom check category.  When set, the LLM
+        is instructed to use this category for grouping.
+    :param row_level: When ``True`` the LLM is instructed to generate a plain
+        ``SELECT pk, col FROM table`` (no aggregation).  The planner fetches
+        every row and applies the validator callable to each column value,
+        then reports ``{total, invalid, invalid_pct, sample_violations}``.
+    """
+
+    factory: Callable[..., Callable[[Any], bool]]
+    llm_context: str = ""
+    check_category: str = ""
+    row_level: bool = False
+
+
+class ValidatorRegistry:
+    """
+    Registry for reusable validator factories with optional LLM context.
+
+    Validators registered here can carry an ``llm_context`` string that the
+    operator automatically injects into the LLM system prompt, guiding the
+    model to produce SQL that returns the metric format the validator expects.
+
+    A module-level :data:`default_registry` instance is available.  Use the
+    convenience decorator :func:`register_validator` to register into it.
+    """
+
+    def __init__(self) -> None:
+        self._entries: dict[str, ValidatorEntry] = {}
+
+    def register(
+        self,
+        name: str,
+        *,
+        llm_context: str = "",
+        check_category: str = "",
+        row_level: bool = False,
+    ) -> Callable[[Callable[..., Callable[[Any], bool]]], Callable[..., 
Callable[[Any], bool]]]:
+        """
+        Return a decorator that registers a validator factory under *name*.
+
+        :param name: Unique name for this validator.
+        :param llm_context: SQL generation hint injected into the LLM prompt.
+        :param check_category: Custom check category for LLM grouping.
+        :param row_level: When ``True``, the LLM generates a plain SELECT
+            returning raw row values instead of an aggregate query.  The
+            planner applies the validator to each row and aggregates results.
+        :raises ValueError: If *name* is already registered.
+        """
+        if name in self._entries:
+            raise ValueError(
+                f"Validator {name!r} is already registered. "
+                "Use a different name or unregister the existing one first."
+            )
+
+        def _decorator(
+            factory: Callable[..., Callable[[Any], bool]],
+        ) -> Callable[..., Callable[[Any], bool]]:
+            self._entries[name] = ValidatorEntry(
+                factory=factory,
+                llm_context=llm_context,
+                check_category=check_category,
+                row_level=row_level,
+            )
+
+            # Wrap the factory so every closure it returns carries the full 
set of
+            # introspection attributes.  Factory authors no longer need to 
stamp these
+            # manually — the decorator handles it automatically.
+            def _wrapped_factory(*args: Any, **kwargs: Any) -> Callable[[Any], 
bool]:
+                closure = factory(*args, **kwargs)
+                # Build a human-readable call representation.
+                arg_parts = [repr(a) for a in args]
+                kwarg_parts = [f"{k}={v!r}" for k, v in kwargs.items()]
+                call_str = f"{name}({', '.join(arg_parts + kwarg_parts)})"
+                # Stamp only when the factory has not set the attribute 
explicitly.
+                if not hasattr(closure, "_validator_name"):
+                    closure._validator_name = name  # type: 
ignore[attr-defined]
+                if not hasattr(closure, "_row_level"):
+                    closure._row_level = row_level  # type: 
ignore[attr-defined]
+                for k, v in kwargs.items():
+                    if not hasattr(closure, f"_{k}"):
+                        setattr(closure, f"_{k}", v)  # e.g. _max_pct, 
_min_count
+                if ".<locals>." in closure.__qualname__:
+                    closure.__qualname__ = call_str
+                if "__repr__" not in closure.__dict__:
+                    closure.__repr__ = lambda: call_str  # type: 
ignore[method-assign]
+                return closure
+
+            # Preserve factory identity attributes on the wrapper.
+            _wrapped_factory._validator_name = name  # type: 
ignore[attr-defined]
+            _wrapped_factory._llm_context = llm_context  # type: 
ignore[attr-defined]
+            _wrapped_factory._check_category = check_category  # type: 
ignore[attr-defined]
+            _wrapped_factory._row_level = row_level  # type: 
ignore[attr-defined]
+            _wrapped_factory.__name__ = factory.__name__
+            _wrapped_factory.__qualname__ = factory.__qualname__
+            _wrapped_factory.__doc__ = factory.__doc__
+
+            # Update the registry entry to point to the wrapped factory.
+            self._entries[name] = ValidatorEntry(
+                factory=_wrapped_factory,
+                llm_context=llm_context,
+                check_category=check_category,
+                row_level=row_level,
+            )
+            return _wrapped_factory
+
+        return _decorator
+
+    def get(self, name: str) -> ValidatorEntry:
+        """
+        Return the :class:`ValidatorEntry` for *name*.
+
+        :raises KeyError: If *name* is not registered.
+        """
+        try:
+            return self._entries[name]
+        except KeyError:
+            raise KeyError(
+                f"Validator {name!r} is not registered. Available validators: 
{sorted(self._entries)}"
+            ) from None
+
+    def list_validators(self) -> list[str]:
+        """Return sorted list of all registered validator names."""
+        return sorted(self._entries)
+
+    def is_row_level(self, validator: Callable[[Any], bool]) -> bool:
+        """
+        Return ``True`` when *validator* was produced by a row-level factory.
+
+        Checks the ``_row_level`` attribute set by the factory closure and,
+        as a fallback, the registry entry for the factory name.
+        """
+        if getattr(validator, "_row_level", False):
+            return True
+        factory_name: str | None = getattr(validator, "_validator_name", None)
+        if factory_name and factory_name in self._entries:
+            return self._entries[factory_name].row_level
+        return False

Review Comment:
   `ValidatorRegistry.is_row_level()` currently treats an explicit 
`_row_level=False` on the validator callable the same as “attribute missing”, 
and then falls back to the registry entry (which may be `row_level=True`). This 
means factory authors cannot override row-level behavior on a per-closure 
basis, and a validator that explicitly sets `_row_level = False` could still be 
treated as row-level.
   
   Consider distinguishing “missing attribute” vs “present but False” (e.g., 
`if hasattr(validator, '_row_level'): return bool(validator._row_level)`; only 
fall back to registry when the attribute is absent).



##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -121,6 +122,20 @@ def execute_query(self, query: str, max_rows: int | None = 
None) -> dict[str, li
         except Exception as e:
             raise QueryExecutionException(f"Error while executing query: {e}")
 
+    def iter_query_row_chunks(self, query: str) -> Iterator[dict[str, 
list[Any]]]:
+        """
+        Execute *query* and yield one column-dict per RecordBatch, streaming 
results.
+
+        :param query: SQL SELECT query to execute.
+        :raises QueryExecutionException: On SQL execution errors.
+        """
+        try:
+            df = self.session_context.sql(query)
+            for batch in df:
+                yield batch.to_pyarrow().to_pydict()
+        except Exception as e:
+            raise QueryExecutionException(f"Error while executing query: {e}")

Review Comment:
   `iter_query_row_chunks()` iterates over the DataFusion DataFrame (`for batch 
in df:`). In the DataFusion Python API, a `DataFrame` is typically not 
iterable; results are usually retrieved via methods like `collect()` / 
`to_pyarrow_table()` / `to_pydict()`. If `DataFrame` isn’t iterable in the 
pinned version, this method will always raise at runtime, breaking the 
row-level DataFusion execution path.
   
   Please switch to a supported batching API for the installed DataFusion 
version, and consider exception chaining (`raise ... from e`) so the original 
stack trace is preserved.



##########
providers/common/ai/tests/unit/common/ai/utils/test_db_schema.py:
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.providers.common.ai.utils.db_schema import build_schema_context
+
+
+class TestBuildSchemaContext:
+    def test_raises_when_table_names_given_without_db_hook(self):
+        with pytest.raises(ValueError, match="table_names requires 
db_conn_id"):
+            build_schema_context(
+                db_hook=None,
+                table_names=["customers"],
+                schema_context=None,
+                datasource_config=MagicMock(),
+            )
+
+    def test_uses_bulk_schema_fetch_when_available(self):
+        mock_db_hook = MagicMock()
+        mock_db_hook.get_table_schemas.return_value = {
+            "customers": [{"name": "id", "type": "INT"}],
+            "orders": [{"name": "order_id", "type": "INT"}],
+        }

Review Comment:
   These tests use unspec'd `MagicMock()` for the DB hook (`mock_db_hook = 
MagicMock()`), which can hide real API mismatches (e.g. a renamed 
`get_table_schema(s)` method) until runtime. Prefer `MagicMock(spec=DbApiHook)` 
/ `create_autospec` (and then stub only the needed methods) so the test suite 
catches interface drift earlier.



##########
providers/common/ai/src/airflow/providers/common/ai/utils/db_schema.py:
##########
@@ -0,0 +1,206 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Shared database hook and schema introspection utilities.
+
+These helpers are used by both 
:class:`~airflow.providers.common.ai.operators.llm_sql.LLMSQLQueryOperator`
+and 
:class:`~airflow.providers.common.ai.operators.llm_data_quality.LLMDataQualityOperator`
 to
+avoid code duplication while keeping both operators decoupled from each other.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.providers.common.compat.sdk import BaseHook
+
+if TYPE_CHECKING:
+    from airflow.providers.common.sql.config import DataSourceConfig
+    from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+log = logging.getLogger(__name__)
+
+# SQLAlchemy dialect_name → sqlglot dialect mapping for names that differ.
+SQLALCHEMY_TO_SQLGLOT_DIALECT: dict[str, str] = {
+    "postgresql": "postgres",
+    "mssql": "tsql",
+}
+
+
+def get_db_hook(db_conn_id: str) -> DbApiHook:
+    """
+    Return a :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` for 
*db_conn_id*.
+
+    :param db_conn_id: Airflow connection ID that resolves to a ``DbApiHook``.
+    :raises ValueError: If the connection does not resolve to a ``DbApiHook``.
+    """
+    # Lazy load to avoid hard dependency on common.sql
+    from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+    connection = BaseHook.get_connection(db_conn_id)
+    hook = connection.get_hook()
+    if not isinstance(hook, DbApiHook):
+        raise ValueError(
+            f"Connection {db_conn_id!r} does not provide a DbApiHook. Got 
{type(hook).__name__}."
+        )
+    return hook
+
+
+def resolve_dialect(db_hook: DbApiHook | None, explicit_dialect: str | None) 
-> str | None:
+    """
+    Resolve the SQL dialect from an explicit parameter or a database hook.
+
+    Normalises SQLAlchemy dialect names to sqlglot equivalents
+    (e.g. ``postgresql`` → ``postgres``).
+
+    :param db_hook: Database hook to read ``dialect_name`` from when 
*explicit_dialect* is absent.
+    :param explicit_dialect: Caller-supplied dialect string; takes priority 
over the hook.
+    :return: Resolved dialect string, or ``None`` when neither source provides 
one.
+    """
+    raw = explicit_dialect
+    if not raw and db_hook and hasattr(db_hook, "dialect_name"):
+        candidate = db_hook.dialect_name
+        raw = candidate if isinstance(candidate, str) else None
+    if raw:
+        return SQLALCHEMY_TO_SQLGLOT_DIALECT.get(raw, raw)
+    return None
+
+
+def build_schema_context(
+    *,
+    db_hook: DbApiHook | None,
+    table_names: list[str] | None,
+    schema_context: str | None,
+    datasource_config: DataSourceConfig | None,
+) -> str:
+    """
+    Return a schema description string suitable for inclusion in an LLM prompt.
+
+    Resolution order:
+    1. *schema_context* — returned as-is when provided (manual override).
+    2. DB introspection via *db_hook* + *table_names*.
+    3. Object-storage introspection via *datasource_config*.
+    4. Empty string when none of the above are available.
+
+    :param db_hook: Hook used for relational-database schema introspection.
+    :param table_names: Table names to introspect via *db_hook*.
+    :param schema_context: Manual schema description; bypasses introspection 
when set.
+    :param datasource_config: DataFusion datasource config for object-storage 
schema.
+    :raises ValueError: If *table_names* are provided but none yield schema 
information.
+    :raises ValueError: If *datasource_config* is ``None`` and no DB tables 
are available.

Review Comment:
   Docstring says `build_schema_context()` raises a `ValueError` when 
`datasource_config` is `None` and no DB tables are available, but the 
implementation returns an empty string when neither `(db_hook and table_names)` 
nor `datasource_config` is provided. Please align the docstring with actual 
behavior (or raise in that case if that’s the intended contract).
   ```suggestion
   
   ```



##########
providers/common/ai/tests/unit/common/ai/utils/test_dq_planner.py:
##########
@@ -0,0 +1,1305 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+from airflow.providers.common.ai.utils.dq_models import DQCheck, DQCheckGroup, 
DQPlan
+from airflow.providers.common.ai.utils.dq_planner import SQLDQPlanner
+from airflow.providers.common.ai.utils.sql_validation import SQLSafetyError
+from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+
+def _make_plan(*check_names: str) -> DQPlan:
+    """Helper: build a minimal DQPlan with one group per check."""
+    groups = [
+        DQCheckGroup(
+            group_id="numeric_aggregate",
+            query=f"SELECT COUNT(*) AS {name}_count FROM t",
+            checks=[DQCheck(check_name=name, metric_key=f"{name}_count", 
group_id="numeric_aggregate")],
+        )
+        for name in check_names
+    ]
+    return DQPlan(groups=groups)
+
+
+def _make_llm_hook(plan: DQPlan) -> MagicMock:
+    """Helper: mock PydanticAIHook that returns *plan* from agent.run_sync."""
+    mock_usage = MagicMock(requests=1, tool_calls=0, input_tokens=100, 
output_tokens=50, total_tokens=150)
+    mock_result = MagicMock(spec=["output", "all_messages", "usage", 
"response"])
+    mock_result.output = plan
+    mock_result.all_messages.return_value = []
+    mock_result.usage.return_value = mock_usage
+    mock_result.response.model_name = "test-model"
+    mock_agent = MagicMock(spec=["run_sync"])
+    mock_agent.run_sync.return_value = mock_result
+    mock_hook = MagicMock(spec=PydanticAIHook)
+    mock_hook.create_agent.return_value = mock_agent
+    return mock_hook
+
+
+class TestSQLDQPlannerBuildSchema:
+    def test_returns_manual_schema_context_verbatim(self):
+        planner = SQLDQPlanner(llm_hook=MagicMock(spec=PydanticAIHook), 
db_hook=None)
+        result = planner.build_schema_context(
+            table_names=None,
+            schema_context="Table: t\nColumns: id INT",
+        )
+        assert result == "Table: t\nColumns: id INT"
+
+    def test_introspects_via_db_hook_when_no_manual_context(self):
+        mock_db_hook = MagicMock()
+        mock_db_hook.get_table_schema.return_value = [{"name": "id", "type": 
"INT"}]
+
+        planner = SQLDQPlanner(llm_hook=MagicMock(spec=PydanticAIHook), 
db_hook=mock_db_hook)
+        result = planner.build_schema_context(

Review Comment:
   These tests use unspec'd `MagicMock()` for a DB hook (e.g. `mock_db_hook = 
MagicMock()`), which can mask interface mismatches (typos/missing methods) and 
make refactors riskier. Prefer `MagicMock(spec=DbApiHook)` (or 
`create_autospec`) for hooks/engines/cursors so invalid attribute access fails 
fast.



##########
providers/common/ai/src/airflow/providers/common/ai/utils/dq_planner.py:
##########
@@ -0,0 +1,873 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+SQL-based data-quality plan generation and execution.
+
+:class:`SQLDQPlanner` is the single entry-point for all SQL DQ logic.
+It is deliberately kept separate from the operator so it can be unit-tested
+without an Airflow context and later swapped for GEX/SODA planners without
+touching the operator.
+"""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import Iterator, Sequence
+from contextlib import closing
+from typing import TYPE_CHECKING, Any
+
+try:
+    from airflow.providers.common.ai.utils.sql_validation import (
+        DEFAULT_ALLOWED_TYPES,
+        SQLSafetyError,
+        validate_sql as _validate_sql,
+    )
+except ImportError as e:
+    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+    raise AirflowOptionalProviderFeatureException(e)
+
+from airflow.providers.common.ai.utils.db_schema import build_schema_context, 
resolve_dialect
+from airflow.providers.common.ai.utils.dq_models import DQCheckGroup, DQPlan, 
RowLevelResult, UnexpectedResult
+from airflow.providers.common.ai.utils.logging import log_run_summary
+
+if TYPE_CHECKING:
+    from pydantic_ai import Agent
+    from pydantic_ai.messages import ModelMessage
+
+    from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+    from airflow.providers.common.sql.config import DataSourceConfig
+    from airflow.providers.common.sql.datafusion.engine import DataFusionEngine
+    from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+log = logging.getLogger(__name__)
+
+_MAX_CHECKS_PER_GROUP = 5
+# Maximum rows fetched from DB per chunk during row-level processing — avoids 
loading the
+# entire result set into memory at once.
+_ROW_LEVEL_CHUNK_SIZE = 10_000
+# Hard cap on violation samples stored per check — independent of SQL LIMIT 
and chunk size.
+_MAX_VIOLATION_SAMPLES = 100
+
+_PLANNING_SYSTEM_PROMPT = """\
+You are a data-quality SQL expert.
+
+Given a set of named data-quality checks and a database schema, produce a \
+DQPlan that minimises the number of SQL queries while keeping each group \
+focused and manageable.
+
+GROUPING STRATEGY (multi-dimensional):
+  Group checks by **(target_table, check_category)**.  Checks on the same table
+  that belong to different categories MUST be in separate groups.
+
+  Allowed check_category values (assign one per check based on its 
description):
+    - null_check      — null / missing value counts or percentages
+    - uniqueness      — duplicate detection, cardinality checks
+    - validity        — regex / format / pattern matching on string columns
+    - numeric_range   — range, bounds, or statistical checks on numeric columns
+    - row_count       — total row counts or existence checks
+    - string_format   — length, encoding, whitespace, or character-set checks
+    - row_level       — per-row or anomaly checks that evaluate individual 
records
+
+  Row-level checks still follow the same grouping rule: group by 
(target_table, check_category="row_level").
+  MAX {max_checks_per_group} CHECKS PER GROUP:
+    If a (table, category) pair has more than {max_checks_per_group} checks,
+    split them into sub-groups of at most {max_checks_per_group}.
+
+  GROUP-ID NAMING:
+    Use the pattern "{{table}}_{{category}}_{{part}}".
+    Examples: customers_null_check_1, orders_validity_1, orders_validity_2
+
+  RATIONALE:
+    Keeping string-column checks (validity, string_format) apart from
+    numeric-column checks (numeric_range, null_check on numbers) produces
+    simpler SQL and makes failures easier to diagnose.
+
+  CORRECT (two groups for same table, different categories):
+    Group customers_null_check_1:
+      SELECT
+        (COUNT(CASE WHEN email IS NULL THEN 1 END) * 100.0 / COUNT(*)) AS 
null_email_pct,
+        (COUNT(CASE WHEN name IS NULL THEN 1 END) * 100.0 / COUNT(*)) AS 
null_name_pct
+      FROM customers
+
+    Group customers_validity_1:
+      SELECT
+        COUNT(CASE WHEN phone NOT LIKE '+___-___-____' THEN 1 END) AS 
invalid_phone_fmt
+      FROM customers
+
+  WRONG (mixing null-check and regex-validity in one group):
+    SELECT
+      (COUNT(CASE WHEN email IS NULL THEN 1 END) * 100.0 / COUNT(*)) AS 
null_email_pct,
+      COUNT(CASE WHEN phone NOT LIKE '+___-___-____' THEN 1 END) AS 
invalid_phone_fmt
+    FROM customers
+
+OUTPUT RULES:
+  1. Each output column must be aliased to exactly the metric_key of its check.
+     Example: ... AS null_email_pct
+  2. Each check_name must exactly match the key in the prompts dict.
+  3. metric_key values must be valid SQL column aliases (snake_case, no 
spaces).
+  4. Generates only SELECT queries — no INSERT, UPDATE, DELETE, DROP, or DDL.
+  5. Use {dialect} syntax.
+  6. Each check must appear in exactly ONE group.
+  7. Each check must have a check_category from the allowed list above.
+  8. Return a valid DQPlan object. No extra commentary.
+"""
+
+_DATAFUSION_SYNTAX_SECTION = """\
+
+DATAFUSION SQL SYNTAX RULES:
+  The target engine is Apache DataFusion.  Observe these syntax differences
+  from standard PostgreSQL / ANSI SQL:
+
+  1. NO "FILTER (WHERE ...)" clause.  Use CASE expressions instead:
+       WRONG:  COUNT(*) FILTER (WHERE email IS NULL)
+       RIGHT:  COUNT(CASE WHEN email IS NULL THEN 1 END)
+
+  2. Regex matching uses the tilde operator:
+       column ~ 'pattern'    (match)
+       column !~ 'pattern'   (no match)
+     Do NOT use SIMILAR TO or POSIX-style ~* (case-insensitive).
+
+  3. CAST syntax — prefer CAST(expr AS type) over :: shorthand.
+
+  4. String functions: Use CHAR_LENGTH (not LEN), SUBSTR (not SUBSTRING with 
FROM/FOR).
+
+  5. Integer division: DataFusion performs integer division for INT/INT.
+     Use CAST(expr AS DOUBLE) to force floating-point division.
+
+  6. Boolean literals: Use TRUE / FALSE (not 1 / 0).
+
+  7. LIMIT is supported.  OFFSET is supported.  FETCH FIRST is NOT supported.
+
+  8. NULL handling: COALESCE, NULLIF, IFNULL are all supported.
+     NVL and ISNULL are NOT supported.
+"""
+
+_UNEXPECTED_QUERY_PROMPT_SECTION = """\
+
+UNEXPECTED VALUE COLLECTION:
+  For checks whose check_category is "validity" or "string_format", also
+  generate an unexpected_query field on the DQCheck.  This query must:
+    - SELECT the primary key column(s) and the column(s) being validated
+    - WHERE the row violates the check condition (the negation of the check)
+    - LIMIT {sample_size}
+    - Use {dialect} syntax
+    - Be a standalone SELECT (not a subquery of the group query)
+
+  For all other categories (null_check, uniqueness, numeric_range, row_count),
+  set unexpected_query to null — these are aggregate checks where individual
+  violating rows are not meaningful.
+
+  Example for a phone-format validity check:
+    unexpected_query: "SELECT id, phone FROM customers WHERE phone !~ 
'^\\d{{4}}-\\d{{4}}-\\d{{4}}$' LIMIT 100"
+"""
+
+_ROW_LEVEL_PROMPT_SECTION = """
+
+ROW-LEVEL CHECKS:
+  Some checks are marked as row_level.  For these:
+    - Generate a SELECT that returns the primary key column(s) and the column
+      being validated.  Do NOT aggregate.
+    - Set row_level = true on the DQCheck entry.
+    - metric_key must be the name of the column containing the value to 
validate
+      (the Python validator will read row[metric_key] for each row).
+    - {row_level_limit_clause}
+    - Place ALL row-level checks for the same table in a single group.
+
+  Row-level check names that require this treatment: {row_level_check_names}
+"""
+
+
+class SQLDQPlanner:
+    """
+    Generates and executes a SQL-based 
:class:`~airflow.providers.common.ai.utils.dq_models.DQPlan`.
+
+    :param llm_hook: Hook used to call the LLM for plan generation.
+    :param db_hook: Hook used to execute generated SQL against the database.
+    :param dialect: SQL dialect forwarded to the LLM prompt and 
``validate_sql``.
+        Auto-detected from *db_hook* when ``None``.
+    :param max_sql_retries: Maximum number of times a failing SQL group query 
is sent
+        back to the LLM for correction before the error is re-raised.  Default 
``2``.
+    :param validator_contexts: Pre-built LLM context string from
+        
:meth:`~airflow.providers.common.ai.utils.dq_validation.ValidatorRegistry.build_llm_context`.
+        Appended to the system prompt so the LLM knows what metric format each
+        custom validator expects.
+    :param row_validators: Mapping of ``{check_name: row_level_callable}`` for
+        checks that require row-by-row Python validation.  When a check's name
+        appears here, ``execute_plan`` fetches all (or sampled) rows and 
applies
+        the callable to each value instead of reading a single aggregate 
scalar.
+    :param row_level_sample_size: Maximum number of rows to fetch for row-level
+        checks.  ``None`` (default) performs a full scan.  A positive integer
+        instructs the LLM to add ``LIMIT N`` to the generated SELECT.
+    """
+
+    def __init__(
+        self,
+        *,
+        llm_hook: PydanticAIHook,
+        db_hook: DbApiHook | None,
+        dialect: str | None = None,
+        max_sql_retries: int = 2,
+        datasource_config: DataSourceConfig | None = None,
+        system_prompt: str = "",
+        agent_params: dict[str, Any] | None = None,
+        collect_unexpected: bool = False,
+        unexpected_sample_size: int = 100,
+        validator_contexts: str = "",
+        row_validators: dict[str, Any] | None = None,
+        row_level_sample_size: int | None = None,
+    ) -> None:
+        self._llm_hook = llm_hook
+        self._db_hook = db_hook
+        self._datasource_config = datasource_config
+        self._dialect = resolve_dialect(db_hook, dialect)
+        # Track whether the execution target is DataFusion so the prompt can
+        # include DataFusion-specific syntax rules.  The dialect stays None
+        # (generic SQL) for sqlglot validation — sqlglot has no DataFusion 
dialect.
+        self._is_datafusion = db_hook is None and datasource_config is not None
+        # When targeting DataFusion, use PostgreSQL dialect for sqlglot 
validation
+        # because DataFusion shares regex operators (~, !~) that the generic 
SQL
+        # parser does not recognise.
+        self._validation_dialect: str | None = "postgres" if 
self._is_datafusion else self._dialect
+        self._max_sql_retries = max_sql_retries
+        self._extra_system_prompt = system_prompt
+        self._agent_params: dict[str, Any] = agent_params or {}
+        self._collect_unexpected = collect_unexpected
+        self._unexpected_sample_size = unexpected_sample_size
+        self._validator_contexts = validator_contexts
+        self._row_validators: dict[str, Any] = row_validators or {}
+        self._row_level_sample_size = row_level_sample_size
+        self._cached_datafusion_engine: DataFusionEngine | None = None
+        self._plan_agent: Agent[None, DQPlan] | None = None
+        self._plan_all_messages: list[ModelMessage] | None = None
+
+    def build_schema_context(
+        self,
+        table_names: list[str] | None,
+        schema_context: str | None,
+    ) -> str:
+        """
+        Return a schema description string for inclusion in the LLM prompt.
+
+        Delegates to 
:func:`~airflow.providers.common.ai.utils.db_schema.build_schema_context`.
+        """
+        return build_schema_context(
+            db_hook=self._db_hook,
+            table_names=table_names,
+            schema_context=schema_context,
+            datasource_config=self._datasource_config,
+        )
+
+    def generate_plan(self, prompts: dict[str, str], schema_context: str) -> 
DQPlan:
+        """
+        Ask the LLM to produce a 
:class:`~airflow.providers.common.ai.utils.dq_models.DQPlan`.
+
+        The LLM receives the user prompts, schema context, and planning 
instructions
+        as a structured-output call (``output_type=DQPlan``).  After 
generation the
+        method verifies that the returned ``check_names`` exactly match
+        ``prompts.keys()``.
+
+        :param prompts: ``{check_name: natural_language_description}`` dict.
+        :param schema_context: Schema description previously built via
+            :meth:`build_schema_context`.
+        :raises ValueError: If the LLM's plan does not cover every prompt key
+            exactly once.
+        """
+        dialect_label = self._dialect or ("DataFusion-compatible SQL" if 
self._is_datafusion else "SQL")
+        system_prompt = _PLANNING_SYSTEM_PROMPT.format(
+            dialect=dialect_label, max_checks_per_group=_MAX_CHECKS_PER_GROUP
+        )
+
+        if self._is_datafusion:
+            system_prompt += _DATAFUSION_SYNTAX_SECTION
+
+        if self._collect_unexpected:
+            system_prompt += _UNEXPECTED_QUERY_PROMPT_SECTION.format(
+                dialect=dialect_label, sample_size=self._unexpected_sample_size
+            )
+
+        if schema_context:
+            system_prompt += f"\nAvailable schema:\n{schema_context}\n"
+
+        if self._validator_contexts:
+            system_prompt += self._validator_contexts
+
+        if self._row_validators:
+            row_level_check_names = ", ".join(sorted(self._row_validators))
+            if self._row_level_sample_size is not None:
+                limit_clause = f"Add LIMIT {self._row_level_sample_size} to 
the query."
+            else:
+                limit_clause = "Do NOT add a LIMIT — return all rows."
+            system_prompt += _ROW_LEVEL_PROMPT_SECTION.format(
+                row_level_check_names=row_level_check_names,
+                row_level_limit_clause=limit_clause,
+            )
+
+        if self._extra_system_prompt:
+            system_prompt += f"\nAdditional 
instructions:\n{self._extra_system_prompt}\n"
+
+        user_message = self._build_user_message(prompts)
+
+        log.info("Using system prompt:\n%s", system_prompt)
+        log.info("Using user message:\n%s", user_message)
+
+        agent = self._llm_hook.create_agent(
+            output_type=DQPlan, instructions=system_prompt, 
**self._agent_params
+        )
+        result = agent.run_sync(user_message)
+        log_run_summary(log, result)
+
+        # Persist the agent and full conversation so execute_plan can continue
+        # the same chat thread when asking for SQL corrections.
+        self._plan_agent = agent
+        self._plan_all_messages = result.all_messages()
+
+        plan: DQPlan = result.output
+
+        self._validate_plan_coverage(plan, prompts)
+        self._validate_group_sizes(plan)
+        return plan
+
+    def execute_plan(self, plan: DQPlan) -> dict[str, Any]:
+        """
+        Execute every SQL group in *plan* and return a flat ``{check_name: 
value}`` map.
+
+        Each group's query is safety-validated via
+        :func:`~airflow.providers.common.ai.utils.sql_validation.validate_sql` 
before
+        execution.  The first row of each result-set is used; each column 
corresponds
+        to the ``metric_key`` of one 
:class:`~airflow.providers.common.ai.utils.dq_models.DQCheck`.
+
+        :param plan: Plan produced by :meth:`generate_plan`.
+        :raises ValueError: If neither *db_hook* nor *datasource_config* was 
supplied.
+        :raises SQLSafetyError: If a generated query fails AST validation even 
after
+            ``max_sql_retries`` LLM correction attempts.
+        :raises ValueError: If a query result does not contain an expected 
metric column.
+        """
+        if self._db_hook is None and self._datasource_config is None:
+            raise ValueError("Either db_conn_id or datasource_config is 
required to execute the DQ plan.")
+
+        datafusion_engine: DataFusionEngine | None = None
+        if self._db_hook is None:
+            if self._cached_datafusion_engine is None:
+                self._cached_datafusion_engine = 
self._build_datafusion_engine()
+            datafusion_engine = self._cached_datafusion_engine
+
+        results: dict[str, Any] = {}
+
+        for raw_group in plan.groups:
+            group = self._validate_or_fix_group(raw_group)
+            log.debug("Executing DQ group %r:\n%s", group.group_id, 
group.query)
+
+            row_level_flags = {check.row_level for check in group.checks}
+            if len(row_level_flags) > 1:
+                mixed = [(c.check_name, c.row_level) for c in group.checks]
+                raise ValueError(
+                    f"Group {group.group_id!r} contains both row-level and 
aggregate checks. "
+                    f"Every check in a group must be homogeneous. Checks: 
{mixed}"
+                )
+
+            # Row-level checks and aggregate checks are mutually exclusive 
within a group
+            # because the LLM places them in separate groups based on the 
system prompt.
+            if any(check.row_level for check in group.checks):
+                row_level_results = self._execute_row_level_group(group, 
datafusion_engine)
+                results.update(row_level_results)
+                continue
+
+            if datafusion_engine is not None:
+                row = self._run_datafusion_group(datafusion_engine, group)
+            else:
+                row = self._run_db_group(group)
+
+            for check in group.checks:
+                if check.metric_key not in row:
+                    raise ValueError(
+                        f"Query for group {group.group_id!r} did not return "
+                        f"column {check.metric_key!r} required by check 
{check.check_name!r}. "
+                        f"Available columns: {list(row.keys())}"
+                    )
+                results[check.check_name] = row[check.metric_key]
+
+        return results
+
+    def _execute_row_level_group(
+        self,
+        group: DQCheckGroup,
+        datafusion_engine: DataFusionEngine | None,
+    ) -> dict[str, RowLevelResult]:
+        """
+        Apply row-level validators to every row returned by *group.query*.
+
+        :param group: A plan group whose checks all have ``row_level=True``.
+        :param datafusion_engine: Active DataFusion engine or ``None`` for DB.
+        :returns: ``{check_name: RowLevelResult}`` for every row-level check.
+        """
+        active_checks = [
+            check for check in group.checks if check.row_level and 
check.check_name in self._row_validators
+        ]
+        for check in group.checks:
+            if check.row_level and check.check_name not in 
self._row_validators:
+                log.warning("No row-level validator found for check %r — 
skipping.", check.check_name)
+
+        if not active_checks:
+            return {}
+
+        # counters[check_name] = [total, invalid, sample_violations]
+        counters: dict[str, list[Any]] = {check.check_name: [0, 0, []] for 
check in active_checks}
+
+        chunk_iter: Iterator[list[dict[str, Any]]]
+        if datafusion_engine is not None:
+            chunk_iter = self._iter_datafusion_row_chunks(datafusion_engine, 
group.query)
+        else:
+            chunk_iter = self._iter_db_row_chunks(group.query)
+
+        total_rows_seen = 0
+        for chunk in chunk_iter:
+            total_rows_seen += len(chunk)
+            for row in chunk:
+                for check in active_checks:
+                    value = row.get(check.metric_key)
+                    c = counters[check.check_name]
+                    c[0] += 1  # total
+                    try:
+                        passed = 
bool(self._row_validators[check.check_name](value))

Review Comment:
   Row-level validation uses `row.get(check.metric_key)` without verifying the 
`metric_key` column is actually present in the query result. If the LLM 
generates a row-level SELECT that omits/aliases the metric column incorrectly, 
this will silently validate `None` values and produce incorrect DQ results.
   
   Recommend failing fast (at least once per chunk) when `check.metric_key` is 
absent from the row dict / cursor description for a row-level group, similar to 
the aggregate-path metric-key validation.



##########
providers/common/ai/src/airflow/providers/common/ai/operators/llm_data_quality.py:
##########
@@ -0,0 +1,633 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for generating and executing data-quality checks from natural 
language using LLMs."""
+
+from __future__ import annotations
+
+import hashlib
+import json
+from collections.abc import Callable, Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.ai.utils.db_schema import get_db_hook
+from airflow.providers.common.ai.utils.dq_models import (
+    DQCheckFailedError,
+    DQCheckGroup,
+    DQCheckResult,
+    DQPlan,
+    DQReport,
+    RowLevelResult,
+    UnexpectedResult,
+)
+from airflow.providers.common.ai.utils.dq_validation import default_registry
+from airflow.providers.common.compat.sdk import Variable
+
+try:
+    from airflow.providers.common.ai.utils.dq_planner import SQLDQPlanner
+except ImportError as e:
+    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+    raise AirflowOptionalProviderFeatureException(e)
+
+if TYPE_CHECKING:
+    from airflow.providers.common.sql.config import DataSourceConfig
+    from airflow.providers.common.sql.hooks.sql import DbApiHook
+    from airflow.sdk import Context
+
+_PLAN_VARIABLE_PREFIX = "dq_plan_"
+_PLAN_VARIABLE_KEY_MAX_LEN = 200  # stay well under Airflow Variable key 
length limit
+
+
+class LLMDataQualityOperator(LLMOperator):
+    """
+    Generate and execute data-quality checks from natural language 
descriptions.
+
+    Each entry in ``prompts`` describes **one** data-quality expectation.
+    The LLM groups related checks into optimised SQL queries, executes them
+    against the target database, and validates each metric against the
+    corresponding entry in ``validators``.  The task fails if any check
+    does not pass, gating downstream tasks on data quality.
+
+    Generated SQL plans are cached in Airflow
+    :class:`~airflow.models.variable.Variable` to avoid repeat LLM calls.
+    Set ``dry_run=True`` to preview the plan without executing it — the
+    serialised plan dict is returned without running any SQL.
+    Set ``require_approval=True`` to gate execution on human review via the
+    HITL interface: the plan is presented to the reviewer first, and SQL
+    checks run only after approval.  ``dry_run`` and ``require_approval``
+    are independent — enabling both returns the plan dict without any
+    approval prompt.
+
+    :param prompts: Mapping of ``{check_name: natural_language_description}``.
+        Each key must be unique.  Use one check per key; the operator enforces
+        a strict one-key → one-check mapping.
+    :param llm_conn_id: Connection ID for the LLM provider.
+    :param model_id: Model identifier (e.g. ``"openai:gpt-4o"``).
+        Overrides the model stored in the connection's extra field.
+    :param system_prompt: Additional instructions appended to the planning 
prompt.
+    :param agent_params: Additional keyword arguments passed to the pydantic-ai
+        ``Agent`` constructor (e.g. ``retries``, ``model_settings``).
+    :param db_conn_id: Connection ID for the database to run checks against.
+        Must resolve to a 
:class:`~airflow.providers.common.sql.hooks.sql.DbApiHook`.
+    :param table_names: Tables to include in the LLM's schema context.
+    :param schema_context: Manual schema description; bypasses DB 
introspection.
+    :param validators: Mapping of ``{check_name: callable}`` where each 
callable
+        receives the raw metric value and returns ``True`` (pass) or ``False`` 
(fail).
+        Keys must be a subset of ``prompts.keys()``.
+        Use built-in factories from
+        :mod:`~airflow.providers.common.ai.utils.dq_validation` or plain 
lambdas::
+
+            from airflow.providers.common.ai.utils.dq_validation import 
null_pct_check
+
+            validators = {
+                "email_nulls": null_pct_check(max_pct=0.05),
+                "row_check": lambda v: v >= 1000,
+            }
+
+    :param dialect: SQL dialect override (``postgres``, ``mysql``, etc.).
+        Auto-detected from *db_conn_id* when not set.
+    :param datasource_config: DataFusion datasource for object-storage schema.
+    :param dry_run: When ``True``, generate and cache the plan but skip 
execution.
+        Returns the serialised plan dict instead of a 
:class:`~airflow.providers.common.ai.utils.dq_models.DQReport`.
+    :param prompt_version: Optional version tag included in the plan cache key.
+        Bump this to invalidate cached plans when prompts change semantically
+        without changing their text.
+    :param collect_unexpected: When ``True``, the LLM generates an
+        ``unexpected_query`` for validity / string-format checks.
+        If any of those checks fail, the unexpected query is executed and
+        the resulting sample rows are included in the report.
+    :param unexpected_sample_size: Maximum number of violating rows to return
+        per failed check.  Default ``100``.
+    :param row_level_sample_size: Maximum number of rows to fetch per row-level
+        check.  ``None`` (default) performs a full table scan — every row is
+        fetched and validated.  A positive integer is passed to the LLM as a
+        ``LIMIT`` clause on the generated SELECT, bounding execution time and
+        memory usage at the cost of sampling coverage.
+    :param require_approval: When ``True``, the operator defers after 
generating
+        and caching the DQ plan.  The plan SQL is surfaced in the HITL 
interface
+        for human review; checks run only after the reviewer approves.  
Inherited
+        from :class:`~airflow.providers.common.ai.operators.llm.LLMOperator`.
+        ``dry_run=True`` takes precedence — combining both flags returns the 
plan
+        dict immediately without requesting approval.
+    """
+
+    template_fields: Sequence[str] = (
+        *LLMOperator.template_fields,
+        "prompts",
+        "db_conn_id",
+        "table_names",
+        "schema_context",
+        "prompt_version",
+        "collect_unexpected",
+        "unexpected_sample_size",
+        "row_level_sample_size",
+    )
+
+    def __init__(
+        self,
+        *,
+        prompts: dict[str, str],
+        db_conn_id: str | None = None,
+        table_names: list[str] | None = None,
+        schema_context: str | None = None,
+        validators: dict[str, Callable[[Any], bool]] | None = None,
+        dialect: str | None = None,
+        datasource_config: DataSourceConfig | None = None,
+        prompt_version: str | None = None,
+        dry_run: bool = False,
+        collect_unexpected: bool = False,
+        unexpected_sample_size: int = 100,
+        row_level_sample_size: int | None = None,
+        **kwargs: Any,
+    ) -> None:
+        kwargs.pop("output_type", None)
+        kwargs.setdefault("prompt", "LLMDataQualityOperator")
+        super().__init__(**kwargs)
+
+        self.prompts = prompts
+        self.db_conn_id = db_conn_id
+        self.table_names = table_names
+        self.schema_context = schema_context
+        self.validators = validators or {}
+        self.dialect = dialect
+        self.datasource_config = datasource_config
+        self.prompt_version = prompt_version
+        self.dq_dry_run = dry_run
+        self.collect_unexpected = collect_unexpected
+        self.unexpected_sample_size = unexpected_sample_size
+        self.row_level_sample_size = row_level_sample_size
+
+        self._validate_prompts()
+        self._validate_validator_keys()
+
+    def execute(self, context: Context) -> dict[str, Any]:
+        """
+        Generate the DQ plan (or load from cache), then execute or defer for 
approval.
+
+        When ``dry_run=True`` the serialised plan dict is returned immediately 
—
+        no SQL is executed and no approval is requested.
+        When ``require_approval=True`` the task defers, presenting the plan to 
a
+        human reviewer; data-quality checks run only after the reviewer 
approves.
+
+        :returns: Dict with keys ``plan``, ``passed``, and ``results``.  On 
success
+            ``passed=True`` and ``results`` is a list of per-check result 
dicts.
+            For row-level checks the ``value`` entry in each result dict is 
itself
+            a dict with keys ``total``, ``invalid``, ``invalid_pct``, and
+            ``sample_violations`` rather than a raw scalar.
+            When ``dry_run=True`` ``passed=None`` and ``results=None`` — no SQL
+            is executed.  The ``plan`` key is always present in all modes.
+        :raises DQCheckFailedError: If any data-quality check fails threshold 
validation.
+        :raises TaskDeferred: When ``require_approval=True``, defers for human 
review
+            before executing the checks.
+        """
+        planner = self._build_planner()
+
+        schema_ctx = planner.build_schema_context(
+            table_names=self.table_names, schema_context=self.schema_context
+        )
+
+        self.log.info("Using schema context:\n%s", schema_ctx)
+
+        plan = self._load_or_generate_plan(planner, schema_ctx)
+
+        if self.dq_dry_run:
+            self.log.info(
+                "dry_run=True — skipping execution. Plan contains %d group(s), 
%d check(s).",
+                len(plan.groups),
+                len(plan.check_names),
+            )
+            for group in plan.groups:
+                self.log.info(
+                    "Group: %s\nChecks: %s\nSQL Query:\n%s\n",
+                    group.group_id,
+                    ", ".join(c.check_name for c in group.checks),
+                    group.query,
+                )
+            return {"plan": plan.model_dump(), "passed": None, "results": None}
+
+        if self.require_approval:
+            # Defer BEFORE execution — approval gates the SQL checks.
+            self.defer_for_approval(  # type: ignore[misc]
+                context,
+                plan.model_dump_json(),
+                body=self._build_dry_run_markdown(plan),
+            )
+            return {}  # type: ignore[return-value]  # pragma: no cover
+
+        return self._run_checks_and_report(context, planner, plan)
+
+    def _build_planner(self) -> SQLDQPlanner:
+        """Construct a 
:class:`~airflow.providers.common.ai.utils.dq_planner.SQLDQPlanner` from 
operator config."""
+        return SQLDQPlanner(
+            llm_hook=self.llm_hook,
+            db_hook=self.db_hook,
+            dialect=self.dialect,
+            datasource_config=self.datasource_config,
+            system_prompt=self.system_prompt,
+            agent_params=self.agent_params,
+            collect_unexpected=self.collect_unexpected,
+            unexpected_sample_size=self.unexpected_sample_size,
+            
validator_contexts=default_registry.build_llm_context(self.validators),
+            row_validators=self._collect_row_validators(),
+            row_level_sample_size=self.row_level_sample_size,
+        )
+
+    def _run_checks_and_report(
+        self,
+        context: Context,
+        planner: SQLDQPlanner,
+        plan: DQPlan,
+    ) -> dict[str, Any]:
+        """
+        Execute *plan* against the database, apply validators, and return the 
serialised report.
+
+        :raises DQCheckFailedError: If any data-quality check fails.
+        """
+        results_map = planner.execute_plan(plan)
+        check_results = self._validate_results(results_map, plan)
+
+        # Collect unexpected rows for failed validity/format checks.
+        if self.collect_unexpected:
+            failed_names = {r.check_name for r in check_results if not 
r.passed}
+            if failed_names:
+                unexpected_map = planner.execute_unexpected_queries(plan, 
failed_names)
+                self._attach_unexpected(check_results, unexpected_map)
+
+        report = DQReport.build(check_results)
+
+        output: dict[str, Any] = {
+            "plan": plan.model_dump(),
+            "passed": report.passed,
+            "results": [
+                {
+                    "check_name": r.check_name,
+                    "metric_key": r.metric_key,
+                    # RowLevelResult is not JSON-serialisable; convert to a 
plain dict.
+                    "value": (
+                        {
+                            "total": r.value.total,
+                            "invalid": r.value.invalid,
+                            "invalid_pct": r.value.invalid_pct,

Review Comment:
   The serialized output for row-level checks omits `sample_size`, but the docs 
for `RowLevelResult` output describe it as part of the returned `value` dict. 
Either include `sample_size` in this serialization or update the docs to match 
what the operator actually returns.
   ```suggestion
                               "invalid_pct": r.value.invalid_pct,
                               "sample_size": r.value.sample_size,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to