This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1455f326031 feat: Add standardized SQL check representation for 
listeners (#66849)
1455f326031 is described below

commit 1455f326031a8a0482f8eef2d17718ce1d4f66fa
Author: Kacper Muda <[email protected]>
AuthorDate: Thu May 14 13:12:01 2026 +0200

    feat: Add standardized SQL check representation for listeners (#66849)
    
    * feat: Add standardized SQL check representation for listeners
    
    * Address review comments
    
    * Address more review comments
---
 .../airflow/providers/common/sql/operators/sql.py  |  475 +++++++-
 .../tests/unit/common/sql/operators/test_sql.py    | 1263 ++++++++++++++++++++
 providers/openlineage/docs/index.rst               |    4 +-
 providers/openlineage/pyproject.toml               |    4 +-
 uv.lock                                            |    4 +-
 5 files changed, 1736 insertions(+), 14 deletions(-)

diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py 
b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py
index 285b4657be6..21c8b6502b0 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py
@@ -20,12 +20,15 @@ from __future__ import annotations
 import ast
 import re
 from collections.abc import Callable, Iterable, Mapping, Sequence
+from dataclasses import dataclass
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, ClassVar, NoReturn, SupportsAbs
 
+from airflow.providers.common.compat.openlineage.check import 
require_openlineage_version
 from airflow.providers.common.compat.sdk import (
     AirflowException,
     AirflowFailException,
+    AirflowOptionalProviderFeatureException,
     AirflowSkipException,
     BaseHook,
     BaseOperator,
@@ -127,6 +130,46 @@ def default_output_processor(results: list[Any], 
descriptions: list[Sequence[Seq
     return results
 
 
+@dataclass
+class SQLCheckResult:
+    """Record of a single SQL check result."""
+
+    name: str
+    """Unique name identifying this check."""
+
+    check_type: str
+    """Classification of the check, e.g. ``"not_null"``, ``"row_count"``, 
``"unique"``."""
+
+    success: bool
+    """Whether the check found no issues (``True``) or found issues 
(``False``)."""
+
+    severity: str = "error"
+    """How severe a failure of this check is: ``"error"`` (raises, default), 
``"warn"`` (logs a warning only),
+     or ``"info"`` (informational, never causes task failure, like branching 
operator)."""
+
+    column: str | None = None
+    """Column the check refers to. When set, the assertion targets a specific 
column rather than
+    the whole table. Should match the column name in the schema."""
+
+    table: str | None = None
+    """Table the check was performed against."""
+
+    expected: str | None = None
+    """The expected value or threshold, serialized as a string, e.g. ``"> 0"`` 
or ``"[10, 100]"``."""
+
+    actual: str | None = None
+    """The actual value observed during the check, serialized as a string."""
+
+    content: str | None = None
+    """The check body — typically the SQL expression used to perform the 
check."""
+
+    description: str | None = None
+    """Human-readable description of what the check verifies."""
+
+    params: dict | None = None
+    """Arbitrary key-value pairs with check-specific context, e.g. accept_none 
or partition_clause."""
+
+
 class BaseSQLOperator(BaseOperator):
     """
     This is a base class for generic SQL Operator to get a DB Hook.
@@ -156,6 +199,7 @@ class BaseSQLOperator(BaseOperator):
         self.database = database
         self.hook_params = hook_params or {}
         self.retry_on_failure = retry_on_failure
+        self.check_results: list[SQLCheckResult] = []  # Used by listeners
 
     @classmethod
     # TODO: can be removed once Airflow min version for this provider is 3.0.0 
or higher
@@ -300,14 +344,116 @@ class BaseSQLOperator(BaseOperator):
             database_specific_lineage = None
 
         if database_specific_lineage is None:
-            return operator_lineage
+            if not self.check_results:
+                return operator_lineage
+            try:
+                return self._attach_check_facets(operator_lineage)
+            except AirflowOptionalProviderFeatureException as err:
+                self.log.debug("OpenLineage could not attach check facets: 
%s", err)
+                return operator_lineage
 
-        return OperatorLineage(
+        merged = OperatorLineage(
             inputs=operator_lineage.inputs + database_specific_lineage.inputs,
             outputs=operator_lineage.outputs + 
database_specific_lineage.outputs,
             run_facets=merge_dicts(operator_lineage.run_facets, 
database_specific_lineage.run_facets),
             job_facets=merge_dicts(operator_lineage.job_facets, 
database_specific_lineage.job_facets),
         )
+        if not self.check_results:
+            return merged
+        try:
+            return self._attach_check_facets(merged)
+        except AirflowOptionalProviderFeatureException as err:
+            self.log.debug("OpenLineage could not attach check facets: %s", 
err)
+            return merged
+
+    @require_openlineage_version(client_min_version="1.47.0")
+    def _attach_check_facets(self, operator_lineage: OperatorLineage) -> 
OperatorLineage:
+        """
+        Attach OpenLineage check-result facets to the given lineage object.
+
+        Requires openlineage-python >= 1.47.0, which introduced the extended 
``Assertion``
+        and ``TestExecution`` schemas (``name``, ``description``, 
``expected``, ``actual``, ``content``,
+        ``params`` fields).  The decorator raises 
``AirflowOptionalProviderFeatureException`` when
+        the client is absent or too old; callers are expected to catch that 
exception.
+
+        Results with a ``table`` set are attached 
as``DataQualityAssertionsDatasetFacet`` on the matching
+        dataset (matched by suffix).  Unmatched results, and results without a 
table, fall back
+        to a run-level ``TestRunFacet``.
+        """
+        from openlineage.client.facet_v2 import 
data_quality_assertions_dataset, test_run
+
+        by_table: dict[str | None, list[SQLCheckResult]] = {}
+        for r in self.check_results:
+            by_table.setdefault(r.table, []).append(r)
+
+        run_level: list[SQLCheckResult] = list(by_table.pop(None, []))
+
+        for table, results in by_table.items():
+            assertions = [
+                data_quality_assertions_dataset.Assertion(
+                    assertion=r.check_type,
+                    success=r.success,
+                    severity=r.severity,
+                    column=r.column,
+                    name=r.name,
+                    description=r.description,
+                    expected=r.expected,
+                    actual=r.actual,
+                    content=r.content,
+                    contentType="sql",
+                    params=r.params,
+                )
+                for r in results
+            ]
+            table_lower = table.lower()  # type: ignore[union-attr]
+            matched = False
+            for group in (operator_lineage.inputs, operator_lineage.outputs):
+                # Exact match takes priority over suffix match when both are 
present in the same group.
+                target = next(
+                    (ds for ds in group if ds.name.lower() == table_lower),
+                    None,
+                ) or next(
+                    (ds for ds in group if 
ds.name.lower().endswith(f".{table_lower}")),
+                    None,
+                )
+                if target is not None:
+                    target.facets = target.facets or {}
+                    existing = target.facets.get("dataQualityAssertions")
+                    facet_assertions = (
+                        existing.assertions + assertions if existing is not 
None else assertions
+                    )
+                    target.facets["dataQualityAssertions"] = (
+                        
data_quality_assertions_dataset.DataQualityAssertionsDatasetFacet(
+                            assertions=facet_assertions
+                        )
+                    )
+                    matched = True
+            if not matched:
+                run_level.extend(results)
+
+        if run_level:
+            tests = [
+                test_run.TestExecution(
+                    name=r.name,
+                    status="pass" if r.success else "fail",
+                    severity=r.severity,
+                    type=r.check_type,
+                    description=r.description,
+                    expected=r.expected,
+                    actual=r.actual,
+                    content=r.content,
+                    contentType="sql",
+                    params={"tested_column": r.column, "tested_table": 
r.table, **(r.params or {})},
+                )
+                for r in run_level
+            ]
+            operator_lineage.run_facets = operator_lineage.run_facets or {}
+            existing = operator_lineage.run_facets.get("test")
+            if existing is not None:
+                tests = existing.tests + tests
+            operator_lineage.run_facets["test"] = 
test_run.TestRunFacet(tests=tests)
+
+        return operator_lineage
 
 
 class SQLExecuteQueryOperator(BaseSQLOperator):
@@ -554,6 +700,9 @@ class SQLColumnCheckOperator(BaseSQLOperator):
                 self.column_mapping[column][check], result, tolerance
             )
 
+        # Save check results before raising exception, to be used by listeners
+        self.check_results = self._build_check_results()
+
         failed_tests = [
             f"Column: {col}\n\tCheck: {check},\n\tCheck Values: 
{check_values}\n"
             for col, checks in self.column_mapping.items()
@@ -685,6 +834,105 @@ class SQLColumnCheckOperator(BaseSQLOperator):
                 "'less than or equal to', use geq_to or leq_to."
             )
 
+    def _build_check_results(self) -> list[SQLCheckResult]:
+        try:
+            return [
+                SQLCheckResult(
+                    name=f"{col}.{check}",
+                    check_type=self._get_column_check_type(check, 
check_values),
+                    success=check_values.get("success", False),
+                    column=col,
+                    table=self.table,
+                    expected=self._format_column_expected(check_values),
+                    actual=str(check_values["result"]) if "result" in 
check_values else None,
+                    content=self.column_checks[check].format(column=col),
+                    description="Column-level statistical check against the 
configured threshold",
+                    params={
+                        k: v
+                        for k, v in {
+                            "equal_to": check_values.get("equal_to"),
+                            "greater_than": check_values.get("greater_than"),
+                            "geq_to": check_values.get("geq_to"),
+                            "less_than": check_values.get("less_than"),
+                            "leq_to": check_values.get("leq_to"),
+                            "tolerance": check_values.get("tolerance"),
+                            "accept_none": self.accept_none,
+                            "partition_clause": self.partition_clause,
+                            "check_partition_clause": 
check_values.get("partition_clause"),
+                        }.items()
+                        if v is not None
+                    }
+                    or None,
+                )
+                for col, checks in self.column_mapping.items()
+                for check, check_values in checks.items()
+            ]
+        except Exception as err:
+            self.log.debug("Failed to build check results %s", err)
+            return []
+
+    @staticmethod
+    def _get_column_check_type(check: str, check_values: dict) -> str:
+        """
+        Return a dbt-style check type for a column check based on the metric 
and comparison operators.
+
+        Range operators (``greater_than``, ``geq_to``, ``less_than``, 
``leq_to``) always yield
+        ``accepted_range``.  ``equal_to`` with a non-zero tolerance also 
yields ``accepted_range``
+        because the check expands to ``[value*(1-tol), value*(1+tol)]``.  For 
``equal_to``-only
+        checks without tolerance, ``null_check`` and ``unique_check`` use 
their semantic names
+        (``not_null``, ``unique``) when asserting the canonical zero value; 
any other target becomes
+        ``accepted_values``.  Custom check names fall through unchanged.
+        """
+        if any(k in check_values for k in ("greater_than", "geq_to", 
"less_than", "leq_to")):
+            return "accepted_range"
+        if "equal_to" in check_values:
+            if check_values.get("tolerance") and check_values["equal_to"] != 0:
+                return "accepted_range"
+            if check == "null_check" and check_values["equal_to"] == 0:
+                return "not_null"
+            if check == "unique_check" and check_values["equal_to"] == 0:
+                return "unique"
+            return "accepted_values"
+        return check
+
+    @staticmethod
+    def _format_column_expected(check_values: dict) -> str:
+        """
+        Return the expected string for a column check, expanding tolerance 
into actual bounds.
+
+        Without tolerance: shows raw comparison operators (e.g. ``">5, 
<=10"``).
+        With tolerance: computes and shows the actual relaxed bounds using the 
same arithmetic
+        as ``_get_match``, so ``equal_to=5, tolerance=0.1`` becomes ``">= 4.5, 
<= 5.5"``.
+        """
+        tol = check_values.get("tolerance")
+        if tol is None:
+            return ", ".join(
+                f"{op}{check_values[key]}"
+                for key, op in {
+                    "equal_to": "",
+                    "greater_than": ">",
+                    "geq_to": ">=",
+                    "less_than": "<",
+                    "leq_to": "<=",
+                }.items()
+                if key in check_values
+            )
+        tol = float(tol)  # The operator already treats is as numeric in 
`_get_match`.
+        parts = []
+        if "equal_to" in check_values:
+            v = check_values["equal_to"]
+            parts.append(f">= {v * (1 - tol)}")
+            parts.append(f"<= {v * (1 + tol)}")
+        if "greater_than" in check_values:
+            parts.append(f"> {check_values['greater_than'] * (1 - tol)}")
+        if "geq_to" in check_values:
+            parts.append(f">= {check_values['geq_to'] * (1 - tol)}")
+        if "less_than" in check_values:
+            parts.append(f"< {check_values['less_than'] * (1 + tol)}")
+        if "leq_to" in check_values:
+            parts.append(f"<= {check_values['leq_to'] * (1 + tol)}")
+        return ", ".join(parts)
+
 
 class SQLTableCheckOperator(BaseSQLOperator):
     """
@@ -754,6 +1002,16 @@ class SQLTableCheckOperator(BaseSQLOperator):
         hook = self.get_db_hook()
         records = hook.get_records(self.sql)
 
+        if records:
+            self.log.info("Record:\n%s", records)
+            for row in records:
+                check, result = row
+                self.checks[check]["result"] = str(result)
+                self.checks[check]["success"] = _parse_boolean(str(result))
+
+        # Save check results before raising exception, to be used by listeners
+        self.check_results = self._build_check_results(records)
+
         if not records:
             # accept_none prevents an error from being thrown if there are no 
records in the table
             if self.accept_none:
@@ -767,12 +1025,6 @@ class SQLTableCheckOperator(BaseSQLOperator):
             # Otherwise, we'll raise an exception
             self._raise_exception(f"The following query returned zero rows: 
{self.sql}")
 
-        self.log.info("Record:\n%s", records)
-
-        for row in records:
-            check, result = row
-            self.checks[check]["success"] = _parse_boolean(str(result))
-
         failed_tests = [
             f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
             for check, check_values in self.checks.items()
@@ -809,6 +1061,36 @@ class SQLTableCheckOperator(BaseSQLOperator):
             for check_name, value in self.checks.items()
         )
 
+    def _build_check_results(self, records) -> list[SQLCheckResult]:
+        try:
+            return [
+                SQLCheckResult(
+                    name=check_name,
+                    check_type="expression_is_true",
+                    success=check_values.get("success", False),
+                    table=self.table,
+                    content=check_values.get("check_statement"),
+                    expected="all truthy",
+                    actual=check_values.get("result"),
+                    severity="warn" if (not records and self.accept_none) else 
"error",
+                    description="User-defined SQL expression must evaluate to 
true",
+                    params={
+                        k: v
+                        for k, v in {
+                            "accept_none": self.accept_none,
+                            "partition_clause": self.partition_clause,
+                            "check_partition_clause": 
check_values.get("partition_clause"),
+                        }.items()
+                        if v is not None
+                    }
+                    or None,
+                )
+                for check_name, check_values in self.checks.items()
+            ]
+        except Exception as err:
+            self.log.debug("Failed to build check results %s", err)
+            return []
+
 
 class SQLCheckOperator(BaseSQLOperator):
     """
@@ -874,6 +1156,10 @@ class SQLCheckOperator(BaseSQLOperator):
         records = self.get_db_hook().get_first(self.sql, self.parameters)
 
         self.log.info("Record: %s", records)
+
+        # Save check results before raising exception, to be used by listeners
+        self.check_results = self._build_check_results(records)
+
         if not records:
             self._raise_exception(f"The following query returned zero rows: 
{self.sql}")
         elif isinstance(records, dict) and not all(records.values()):
@@ -883,6 +1169,31 @@ class SQLCheckOperator(BaseSQLOperator):
 
         self.log.info("Success.")
 
+    def _build_check_results(self, records) -> list[SQLCheckResult]:
+        try:
+            if not records:
+                success = False
+            elif isinstance(records, dict):
+                success = all(records.values())
+            else:
+                success = all(records)
+
+            return [
+                SQLCheckResult(
+                    name=self.task_id,
+                    check_type="expression_is_true",
+                    success=success,
+                    expected="all truthy",
+                    actual=str(records) if records else None,
+                    content=self.sql,
+                    description="All values in the first returned row must 
evaluate to true",
+                    params={"parameters": self.parameters} if self.parameters 
else None,
+                )
+            ]
+        except Exception as err:
+            self.log.debug("Failed to build check results %s", err)
+            return []
+
 
 class SQLValueCheckOperator(BaseSQLOperator):
     """
@@ -956,6 +1267,10 @@ class SQLValueCheckOperator(BaseSQLOperator):
     def execute(self, context: Context):
         self.log.info("Executing SQL check: %s", self.sql)
         records = self.get_db_hook().get_first(self.sql, self.parameters)
+
+        # Save check results before raising exception, to be used by listeners
+        self.check_results = self._build_check_results(records)
+
         self.check_value(records)
 
     def _to_float(self, records):
@@ -973,6 +1288,54 @@ class SQLValueCheckOperator(BaseSQLOperator):
 
         return [record == numeric_pass_value_conv for record in 
numeric_records]
 
+    def _evaluate_check_value(self, records) -> bool:
+        """Return whether the value check passes without raising."""
+        if not records:
+            return False
+        pass_value_conv = _convert_to_float_if_possible(self.pass_value)
+        if not isinstance(pass_value_conv, float):
+            return all(self._get_string_matches(records, pass_value_conv))
+        try:
+            numeric_records = self._to_float(records)
+        except (ValueError, TypeError):
+            return False
+        return all(self._get_numeric_matches(numeric_records, pass_value_conv))
+
+    def _build_check_results(self, records) -> list[SQLCheckResult]:
+        try:
+            expected_str = self.pass_value
+            check_type = "accepted_values"
+
+            pass_value_conv = _convert_to_float_if_possible(self.pass_value)
+            if isinstance(pass_value_conv, float) and isinstance(self.tol, 
float):
+                expected_str = f">= {pass_value_conv * (1 - self.tol)}, <= 
{pass_value_conv * (1 + self.tol)}"
+                check_type = "accepted_range"
+
+            return [
+                SQLCheckResult(
+                    name=self.task_id,
+                    check_type=check_type,
+                    success=self._evaluate_check_value(records),
+                    expected=expected_str,
+                    actual=str(records) if records else None,
+                    content=self.sql,
+                    description="All values in the first returned row must 
match the expected value",
+                    params={
+                        k: v
+                        for k, v in {
+                            "pass_value": self.pass_value,
+                            "tolerance": self.tol,
+                            "parameters": self.parameters,
+                        }.items()
+                        if v is not None
+                    }
+                    or None,
+                )
+            ]
+        except Exception as err:
+            self.log.debug("Failed to build check results %s", err)
+            return []
+
 
 class SQLIntervalCheckOperator(BaseSQLOperator):
     """
@@ -1009,6 +1372,11 @@ class SQLIntervalCheckOperator(BaseSQLOperator):
         "relative_diff": lambda cur, ref: abs(cur - ref) / ref,
     }
 
+    ratio_formula_expressions = {
+        "max_over_min": "max({current}, {past}) / min({current}, {past})",
+        "relative_diff": "abs({current} - {past}) / {past}",
+    }
+
     def __init__(
         self,
         *,
@@ -1103,6 +1471,9 @@ class SQLIntervalCheckOperator(BaseSQLOperator):
                 threshold,
             )
 
+        # Save check results before raising exception, to be used by listeners
+        self.check_results = self._build_check_results(all_tests_results)
+
         failed_tests = [single for single in all_tests_results.values() if not 
single["success"]]
         if failed_tests:
             self.log.warning(
@@ -1124,6 +1495,37 @@ class SQLIntervalCheckOperator(BaseSQLOperator):
 
         self.log.info("All tests have passed")
 
+    def _build_check_results(self, all_tests_results: dict[str, dict[str, 
Any]]) -> list[SQLCheckResult]:
+        try:
+            return [
+                SQLCheckResult(
+                    name=f"interval_{metric}",
+                    check_type="accepted_range",
+                    success=details["success"],
+                    table=self.table,
+                    expected=f"< {details['threshold']}",
+                    
content=self.ratio_formula_expressions.get(self.ratio_formula, 
self.ratio_formula).format(
+                        current=details["current_metric"], 
past=details["past_metric"]
+                    ),
+                    actual=str(details["ratio"]) if details["ratio"] is not 
None else "0",
+                    description="Ratio of current metric to historical 
baseline must be below the threshold",
+                    params={
+                        "threshold": details["threshold"],
+                        "days_back": self.days_back,
+                        "ratio_formula_name": self.ratio_formula,
+                        "ratio_formula": 
self.ratio_formula_expressions.get(self.ratio_formula),
+                        "date_filter_column": self.date_filter_column,
+                        "ignore_zero": self.ignore_zero,
+                        "current_metric": details["current_metric"],
+                        "past_metric": details["past_metric"],
+                    },
+                )
+                for metric, details in all_tests_results.items()
+            ]
+        except Exception as err:
+            self.log.debug("Failed to build check results %s", err)
+            return []
+
 
 class SQLThresholdCheckOperator(BaseSQLOperator):
     """
@@ -1202,6 +1604,10 @@ class SQLThresholdCheckOperator(BaseSQLOperator):
         }
 
         self.push(meta_data)
+
+        # Save check results before raising exception, to be used by listeners
+        self.check_results = self._build_check_results(result, meta_data)
+
         if not meta_data["within_threshold"]:
             result = (
                 round(meta_data.get("result"), 2)  # type: ignore[arg-type]
@@ -1220,6 +1626,27 @@ class SQLThresholdCheckOperator(BaseSQLOperator):
 
         self.log.info("Test %s Successful.", self.task_id)
 
+    def _build_check_results(self, result: Any, meta_data: dict[str, Any]) -> 
list[SQLCheckResult]:
+        try:
+            return [
+                SQLCheckResult(
+                    name=self.task_id,
+                    check_type="accepted_range",
+                    success=meta_data["within_threshold"],
+                    expected=f">= {meta_data['min_threshold']}, <= 
{meta_data['max_threshold']}",
+                    actual=str(result),
+                    content=self.sql,
+                    description="SQL result must fall within the configured 
bounds",
+                    params={
+                        "min_threshold": str(self.min_threshold),
+                        "max_threshold": str(self.max_threshold),
+                    },
+                )
+            ]
+        except Exception as err:
+            self.log.debug("Failed to build check results %s", err)
+            return []
+
     def push(self, meta_data):
         """
         Send data check info and metadata to an external database.
@@ -1318,8 +1745,40 @@ class BranchSQLOperator(BaseSQLOperator, SkipMixin):
                 f"Unexpected query return result '{query_result}' type 
'{type(query_result)}'"
             )
 
+        # Save check results before raising exception, to be used by listeners
+        self.check_results = self._build_check_results(query_result)
+
         self.skip_all_except(context["ti"], self.follow_branch)
 
+    def _build_check_results(self, query_result: Any) -> list[SQLCheckResult]:
+        try:
+            return [
+                SQLCheckResult(
+                    name=self.task_id,
+                    check_type="expression_is_true",
+                    success=self.follow_branch == self.follow_task_ids_if_true,
+                    severity="info",
+                    expected="truthy",
+                    actual=str(query_result),
+                    content=self.sql,
+                    description="SQL result is evaluated as boolean to 
determine the execution branch",
+                    params={
+                        k: v
+                        for k, v in {
+                            "follow_task_ids_if_true": 
self.follow_task_ids_if_true,
+                            "follow_task_ids_if_false": 
self.follow_task_ids_if_false,
+                            "follow_branch": self.follow_branch,
+                            "parameters": self.parameters,
+                        }.items()
+                        if v is not None
+                    }
+                    or None,
+                )
+            ]
+        except Exception as err:
+            self.log.debug("Failed to build check results %s", err)
+            return []
+
 
 class SQLInsertRowsOperator(BaseSQLOperator):
     """
diff --git a/providers/common/sql/tests/unit/common/sql/operators/test_sql.py 
b/providers/common/sql/tests/unit/common/sql/operators/test_sql.py
index 08becbc12e1..14380b71736 100644
--- a/providers/common/sql/tests/unit/common/sql/operators/test_sql.py
+++ b/providers/common/sql/tests/unit/common/sql/operators/test_sql.py
@@ -35,6 +35,7 @@ from airflow.providers.common.sql.operators.sql import (
     BaseSQLOperator,
     BranchSQLOperator,
     SQLCheckOperator,
+    SQLCheckResult,
     SQLColumnCheckOperator,
     SQLExecuteQueryOperator,
     SQLInsertRowsOperator,
@@ -1622,6 +1623,1268 @@ class TestBaseSQLOperatorSubClass:
         mock_get_connection.assert_called_once_with("test_conn")
 
 
+class TestSQLColumnCheckOperatorBuildCheckResults:
+    @staticmethod
+    def _make_operator(column_mapping, **kwargs):
+        return SQLColumnCheckOperator(
+            task_id="test_task", table="test_table", 
column_mapping=column_mapping, **kwargs
+        )
+
+    def test_not_null_check(self):
+        op = self._make_operator({"col": {"null_check": {"equal_to": 0}}})
+        op.column_mapping["col"]["null_check"]["result"] = 0
+        op.column_mapping["col"]["null_check"]["success"] = True
+        results = op._build_check_results()
+        assert len(results) == 1
+        r = results[0]
+        assert r.name == "col.null_check"
+        assert r.check_type == "not_null"
+        assert r.success is True
+        assert r.column == "col"
+        assert r.table == "test_table"
+        assert r.expected == "0"
+        assert r.actual == "0"
+        assert r.content == "SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)"
+        assert r.description == "Column-level statistical check against the 
configured threshold"
+        assert r.severity == "error"
+        assert r.params == {"equal_to": 0, "accept_none": True}
+
+    def test_unique_check(self):
+        op = self._make_operator({"col": {"unique_check": {"equal_to": 0}}})
+        op.column_mapping["col"]["unique_check"]["result"] = 0
+        op.column_mapping["col"]["unique_check"]["success"] = True
+        results = op._build_check_results()
+        assert len(results) == 1
+        r = results[0]
+        assert r.check_type == "unique"
+        assert r.content == "COUNT(col) - COUNT(DISTINCT(col))"
+        assert r.params == {"equal_to": 0, "accept_none": True}
+
+    def test_accepted_range_with_tolerance(self):
+        op = self._make_operator({"col": {"distinct_check": {"equal_to": 10, 
"tolerance": 0.1}}})
+        op.column_mapping["col"]["distinct_check"]["result"] = 10
+        op.column_mapping["col"]["distinct_check"]["success"] = True
+        results = op._build_check_results()
+        assert len(results) == 1
+        r = results[0]
+        assert r.check_type == "accepted_range"
+        assert r.expected == ">= 9.0, <= 11.0"
+        assert r.actual == "10"
+        assert r.content == "COUNT(DISTINCT(col))"
+        assert r.params == {"equal_to": 10, "tolerance": 0.1, "accept_none": 
True}
+
+    def test_accepted_range_geq_leq(self):
+        op = self._make_operator({"col": {"min": {"geq_to": 1, "leq_to": 
100}}})
+        op.column_mapping["col"]["min"]["result"] = 50
+        op.column_mapping["col"]["min"]["success"] = True
+        results = op._build_check_results()
+        assert len(results) == 1
+        r = results[0]
+        assert r.check_type == "accepted_range"
+        assert r.expected == ">=1, <=100"
+        assert r.params == {"geq_to": 1, "leq_to": 100, "accept_none": True}
+
+    def test_multiple_checks_correct_names_and_order(self):
+        op = self._make_operator(
+            {
+                "col_a": {
+                    "null_check": {"equal_to": 0, "result": 0, "success": 
True},
+                    "min": {"geq_to": 1, "result": 5, "success": True},
+                },
+                "col_b": {
+                    "max": {"less_than": 100, "result": 50, "success": True},
+                },
+            }
+        )
+        results = op._build_check_results()
+        assert len(results) == 3
+        assert results[0].name == "col_a.null_check"
+        assert results[1].name == "col_a.min"
+        assert results[2].name == "col_b.max"
+
+    def test_failing_check_recorded_correctly(self):
+        op = self._make_operator({"col": {"null_check": {"equal_to": 0}}})
+        op.column_mapping["col"]["null_check"]["result"] = 5
+        op.column_mapping["col"]["null_check"]["success"] = False
+        results = op._build_check_results()
+        assert len(results) == 1
+        assert results[0].success is False
+        assert results[0].actual == "5"
+
+    def test_partition_clause_in_params(self):
+        op = self._make_operator({"col": {"null_check": {"equal_to": 0, 
"partition_clause": "year=2024"}}})
+        op.partition_clause = "region=us"
+        op.column_mapping["col"]["null_check"]["result"] = 0
+        op.column_mapping["col"]["null_check"]["success"] = True
+        results = op._build_check_results()
+        assert len(results) == 1
+        assert results[0].params == {
+            "equal_to": 0,
+            "accept_none": True,
+            "partition_clause": "region=us",
+            "check_partition_clause": "year=2024",
+        }
+
+    def test_build_check_results_failure_returns_empty_list(self):
+        op = self._make_operator({"col": {"null_check": {"equal_to": 0, 
"result": 0, "success": True}}})
+        with mock.patch(
+            "airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom")
+        ):
+            results = op._build_check_results()
+        assert results == []
+
+    @mock.patch("airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom"))
+    @mock.patch.object(SQLColumnCheckOperator, "get_db_hook")
+    def test_execute_unaffected_when_build_check_results_raises(self, 
mock_hook, _):
+        mock_hook.return_value.get_records.return_value = [("col", 
"null_check", 0)]
+        op = self._make_operator({"col": {"null_check": {"equal_to": 0}}})
+        op.execute(MagicMock())
+        assert op.check_results == []
+
+    @mock.patch.object(SQLColumnCheckOperator, "get_db_hook")
+    def test_execute_populates_check_results(self, mock_hook):
+        # Records returned by the DB: (column, check_name, result)
+        mock_hook.return_value.get_records.return_value = [("col", 
"null_check", 0)]
+        op = self._make_operator({"col": {"null_check": {"equal_to": 0}}})
+        op.execute(MagicMock())
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "col.null_check"
+        assert r.check_type == "not_null"
+        assert r.success is True
+        assert r.severity == "error"
+        assert r.column == "col"
+        assert r.table == "test_table"
+        assert r.expected == "0"
+        assert r.actual == "0"
+        assert r.content == "SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)"
+        assert r.description == "Column-level statistical check against the 
configured threshold"
+        assert r.params == {"equal_to": 0, "accept_none": True}
+
+    @pytest.mark.parametrize("operator", ["greater_than", "geq_to", 
"less_than", "leq_to", "equal_to"])
+    def test_known_comparison_operators_produce_expected_string(self, 
operator):
+        """Regression guard for existing operators. Does NOT detect new 
operators being added —
+        when adding a new comparison operator, add a case to this parametrize 
list manually."""
+        result = SQLColumnCheckOperator._format_column_expected({operator: 5})
+        assert result != ""
+
+    @pytest.mark.parametrize(
+        ("check", "check_values", "expected_type"),
+        [
+            # Range operators always win
+            ("null_check", {"greater_than": 0}, "accepted_range"),
+            ("col_count", {"geq_to": 10}, "accepted_range"),
+            ("max", {"less_than": 100}, "accepted_range"),
+            ("min", {"leq_to": 5}, "accepted_range"),
+            # equal_to + non-zero tolerance → range (tolerance expands to 
bounds)
+            ("distinct_check", {"equal_to": 10, "tolerance": 0.1}, 
"accepted_range"),
+            # equal_to without tolerance → semantic names for standard 
zero-value checks
+            ("null_check", {"equal_to": 0}, "not_null"),
+            ("unique_check", {"equal_to": 0}, "unique"),
+            # equal_to=0 with tolerance: condition requires equal_to != 0, so 
falls through
+            ("null_check", {"equal_to": 0, "tolerance": 0.1}, "not_null"),
+            ("distinct_check", {"equal_to": 0, "tolerance": 0.1}, 
"accepted_values"),
+            # equal_to with non-zero value on standard check names
+            ("null_check", {"equal_to": 5}, "accepted_values"),
+            ("unique_check", {"equal_to": 5}, "accepted_values"),
+            # generic check name with no comparison operators → returned 
unchanged
+            ("custom_metric", {}, "custom_metric"),
+            # equal_to on a non-null/unique check name
+            ("distinct_check", {"equal_to": 10}, "accepted_values"),
+        ],
+    )
+    def test_get_column_check_type(self, check, check_values, expected_type):
+        assert SQLColumnCheckOperator._get_column_check_type(check, 
check_values) == expected_type
+
+    @pytest.mark.parametrize(
+        ("check_values", "expected"),
+        [
+            ({"equal_to": 0}, "0"),
+            ({"equal_to": 5}, "5"),
+            ({"greater_than": 5}, ">5"),
+            ({"geq_to": 5}, ">=5"),
+            ({"less_than": 10}, "<10"),
+            ({"leq_to": 10}, "<=10"),
+            ({"greater_than": 0, "less_than": 100}, ">0, <100"),
+            ({"geq_to": 5, "leq_to": 10}, ">=5, <=10"),
+        ],
+    )
+    def test_format_column_expected_no_tolerance(self, check_values, expected):
+        assert SQLColumnCheckOperator._format_column_expected(check_values) == 
expected
+
+    @pytest.mark.parametrize(
+        ("check_values", "expected"),
+        [
+            ({"equal_to": 5, "tolerance": 0.1}, ">= 4.5, <= 5.5"),
+            ({"equal_to": 10, "tolerance": 0.5}, ">= 5.0, <= 15.0"),
+            ({"greater_than": 10, "tolerance": 0.1}, "> 9.0"),
+            ({"geq_to": 10, "tolerance": 0.1}, ">= 9.0"),
+            ({"less_than": 10, "tolerance": 0.1}, "< 11.0"),
+            ({"leq_to": 10, "tolerance": 0.1}, "<= 11.0"),
+            ({"geq_to": 10, "leq_to": 20, "tolerance": 0.1}, ">= 9.0, <= 
22.0"),
+        ],
+    )
+    def test_format_column_expected_with_tolerance(self, check_values, 
expected):
+        assert SQLColumnCheckOperator._format_column_expected(check_values) == 
expected
+
+
+class TestSQLTableCheckOperatorBuildCheckResults:
+    @staticmethod
+    def _make_operator(checks, **kwargs):
+        return SQLTableCheckOperator(task_id="test_task", table="test_table", 
checks=checks, **kwargs)
+
+    def test_passing_check(self):
+        checks = {"row_count_check": {"check_statement": "COUNT(*) >= 3"}}
+        op = self._make_operator(checks)
+        op.checks["row_count_check"]["result"] = "1"
+        op.checks["row_count_check"]["success"] = True
+        records = [("row_count_check", 1)]
+        results = op._build_check_results(records)
+        assert len(results) == 1
+        r = results[0]
+        assert r.name == "row_count_check"
+        assert r.check_type == "expression_is_true"
+        assert r.success is True
+        assert r.table == "test_table"
+        assert r.content == "COUNT(*) >= 3"
+        assert r.expected == "all truthy"
+        assert r.actual == "1"
+        assert r.severity == "error"
+        assert r.params == {"accept_none": False}
+
+    def test_failing_check(self):
+        checks = {"row_count_check": {"check_statement": "COUNT(*) >= 3"}}
+        op = self._make_operator(checks)
+        op.checks["row_count_check"]["result"] = "0"
+        op.checks["row_count_check"]["success"] = False
+        results = op._build_check_results([("row_count_check", 0)])
+        assert len(results) == 1
+        assert results[0].success is False
+        assert results[0].actual == "0"
+
+    def test_accept_none_severity_warn_when_empty_records(self):
+        checks = {"row_count_check": {"check_statement": "COUNT(*) >= 3"}}
+        op = self._make_operator(checks, accept_none=True)
+        op.checks["row_count_check"]["success"] = False
+        results = op._build_check_results([])
+        assert len(results) == 1
+        assert results[0].severity == "warn"
+
+    def test_severity_error_when_records_present(self):
+        checks = {"row_count_check": {"check_statement": "COUNT(*) >= 3"}}
+        op = self._make_operator(checks, accept_none=True)
+        op.checks["row_count_check"]["result"] = "1"
+        op.checks["row_count_check"]["success"] = True
+        results = op._build_check_results([("row_count_check", 1)])
+        assert results[0].severity == "error"
+
+    def test_multiple_checks(self):
+        checks = {
+            "count_check": {"check_statement": "COUNT(*) > 0"},
+            "sum_check": {"check_statement": "SUM(val) > 0"},
+        }
+        op = self._make_operator(checks)
+        op.checks["count_check"].update({"result": "1", "success": True})
+        op.checks["sum_check"].update({"result": "1", "success": True})
+        results = op._build_check_results([("count_check", 1), ("sum_check", 
1)])
+        assert len(results) == 2
+        assert results[0].name == "count_check"
+        assert results[1].name == "sum_check"
+
+    def test_build_check_results_failure_returns_empty_list(self):
+        checks = {"row_count_check": {"check_statement": "COUNT(*) >= 3"}}
+        op = self._make_operator(checks)
+        op.checks["row_count_check"].update({"result": "1", "success": True})
+        with mock.patch(
+            "airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom")
+        ):
+            results = op._build_check_results([("row_count_check", 1)])
+        assert results == []
+
+    @mock.patch("airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom"))
+    @mock.patch.object(SQLTableCheckOperator, "get_db_hook")
+    def test_execute_unaffected_when_build_check_results_raises(self, 
mock_hook, _):
+        mock_hook.return_value.get_records.return_value = [("row_count_check", 
1)]
+        op = self._make_operator({"row_count_check": {"check_statement": 
"COUNT(*) >= 1"}})
+        op.execute(MagicMock())
+        assert op.check_results == []
+
+    @mock.patch.object(SQLTableCheckOperator, "get_db_hook")
+    def test_execute_populates_check_results(self, mock_hook):
+        mock_hook.return_value.get_records.return_value = [("row_count_check", 
"1")]
+        op = self._make_operator({"row_count_check": {"check_statement": 
"COUNT(*) >= 1"}})
+        op.execute(MagicMock())
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "row_count_check"
+        assert r.check_type == "expression_is_true"
+        assert r.success is True
+        assert r.severity == "error"
+        assert r.column is None
+        assert r.table == "test_table"
+        assert r.expected == "all truthy"
+        assert r.actual == "1"
+        assert r.content == "COUNT(*) >= 1"
+        assert r.description == "User-defined SQL expression must evaluate to 
true"
+        assert r.params == {"accept_none": False}
+
+
+class TestSQLCheckOperatorBuildCheckResults:
+    @staticmethod
+    def _make_operator(**kwargs):
+        return SQLCheckOperator(task_id="test_task", sql="SELECT 1", **kwargs)
+
+    def test_all_truthy_records(self):
+        op = self._make_operator()
+        results = op._build_check_results([1, 1, 1])
+        assert len(results) == 1
+        r = results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "expression_is_true"
+        assert r.success is True
+        assert r.expected == "all truthy"
+        assert r.actual == "[1, 1, 1]"
+        assert r.content == "SELECT 1"
+        assert r.params is None
+
+    def test_falsy_value_in_records(self):
+        op = self._make_operator()
+        results = op._build_check_results([1, 0, 1])
+        assert len(results) == 1
+        assert results[0].success is False
+
+    def test_empty_records(self):
+        op = self._make_operator()
+        results = op._build_check_results([])
+        assert len(results) == 1
+        r = results[0]
+        assert r.success is False
+        assert r.actual is None
+
+    def test_dict_records_all_true(self):
+        op = self._make_operator()
+        results = op._build_check_results({"A": True, "B": True})
+        assert len(results) == 1
+        assert results[0].success is True
+
+    def test_dict_records_not_all_true(self):
+        op = self._make_operator()
+        results = op._build_check_results({"A": True, "B": False})
+        assert len(results) == 1
+        assert results[0].success is False
+
+    def test_parameters_in_params(self):
+        op = self._make_operator(parameters="my_params")
+        results = op._build_check_results([1])
+        assert len(results) == 1
+        assert results[0].params == {"parameters": "my_params"}
+
+    def test_build_check_results_failure_returns_empty_list(self):
+        op = self._make_operator()
+        with mock.patch(
+            "airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom")
+        ):
+            results = op._build_check_results([1])
+        assert results == []
+
+    @mock.patch("airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom"))
+    @mock.patch.object(SQLCheckOperator, "get_db_hook")
+    def test_execute_unaffected_when_build_check_results_raises(self, 
mock_hook, _):
+        mock_hook.return_value.get_first.return_value = [1]
+        op = self._make_operator()
+        op.execute(MagicMock())
+        assert op.check_results == []
+
+    @mock.patch.object(SQLCheckOperator, "get_db_hook")
+    def test_execute_populates_check_results(self, mock_hook):
+        mock_hook.return_value.get_first.return_value = [1, 2, 3]
+        op = self._make_operator()
+        op.execute(MagicMock())
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "expression_is_true"
+        assert r.success is True
+        assert r.severity == "error"
+        assert r.column is None
+        assert r.table is None
+        assert r.expected == "all truthy"
+        assert r.actual == "[1, 2, 3]"
+        assert r.content == "SELECT 1"
+        assert r.description == "All values in the first returned row must 
evaluate to true"
+        assert r.params is None
+
+
+class TestSQLValueCheckOperatorBuildCheckResults:
+    @staticmethod
+    def _make_operator(pass_value, tolerance=None):
+        return SQLValueCheckOperator(
+            task_id="test_task", sql="SELECT val FROM t", 
pass_value=pass_value, tolerance=tolerance
+        )
+
+    def test_exact_match_no_tolerance(self):
+        op = self._make_operator(pass_value="5")
+        results = op._build_check_results([5])
+        assert len(results) == 1
+        r = results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "accepted_values"
+        assert r.success is True
+        assert r.expected == "5"
+        assert r.actual == "[5]"
+        assert r.content == "SELECT val FROM t"
+        assert r.params == {"pass_value": "5"}
+
+    def test_numeric_tolerance_produces_accepted_range(self):
+        op = self._make_operator(pass_value=5, tolerance=0.1)
+        results = op._build_check_results([5])
+        assert len(results) == 1
+        r = results[0]
+        assert r.check_type == "accepted_range"
+        assert r.expected == ">= 4.5, <= 5.5"
+        assert r.params == {"pass_value": "5", "tolerance": 0.1}
+
+    def test_non_numeric_pass_value_is_accepted_values(self):
+        op = self._make_operator(pass_value="hello")
+        results = op._build_check_results(["hello"])
+        assert len(results) == 1
+        assert results[0].check_type == "accepted_values"
+        assert results[0].expected == "hello"
+
+    def test_failing_check(self):
+        op = self._make_operator(pass_value="10")
+        results = op._build_check_results([99])
+        assert len(results) == 1
+        assert results[0].success is False
+
+    def test_parameters_in_params(self):
+        op = self._make_operator(pass_value="5")
+        op.parameters = {"key": "val"}
+        results = op._build_check_results([5])
+        assert len(results) == 1
+        assert results[0].params == {"pass_value": "5", "parameters": {"key": 
"val"}}
+
+    def test_build_check_results_failure_returns_empty_list(self):
+        op = self._make_operator(pass_value="5")
+        with mock.patch(
+            "airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom")
+        ):
+            results = op._build_check_results([5])
+        assert results == []
+
+    @mock.patch("airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom"))
+    @mock.patch.object(SQLValueCheckOperator, "get_db_hook")
+    def test_execute_unaffected_when_build_check_results_raises(self, 
mock_hook, _):
+        mock_hook.return_value.get_first.return_value = [5]
+        op = self._make_operator(pass_value="5")
+        op.execute(MagicMock())
+        assert op.check_results == []
+
+    @mock.patch.object(SQLValueCheckOperator, "get_db_hook")
+    def test_execute_populates_check_results(self, mock_hook):
+        mock_hook.return_value.get_first.return_value = [5]
+        op = self._make_operator(pass_value="5")
+        op.execute(MagicMock())
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "accepted_values"
+        assert r.success is True
+        assert r.severity == "error"
+        assert r.column is None
+        assert r.table is None
+        assert r.expected == "5"
+        assert r.actual == "[5]"
+        assert r.content == "SELECT val FROM t"
+        assert r.description == "All values in the first returned row must 
match the expected value"
+        assert r.params == {"pass_value": "5"}
+
+    @mock.patch.object(SQLValueCheckOperator, "get_db_hook")
+    def test_execute_populates_check_results_with_tolerance(self, mock_hook):
+        mock_hook.return_value.get_first.return_value = [5]
+        op = self._make_operator(pass_value=5, tolerance=0.1)
+        op.execute(MagicMock())
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "accepted_range"
+        assert r.success is True
+        assert r.severity == "error"
+        assert r.column is None
+        assert r.table is None
+        assert r.expected == ">= 4.5, <= 5.5"
+        assert r.actual == "[5]"
+        assert r.content == "SELECT val FROM t"
+        assert r.description == "All values in the first returned row must 
match the expected value"
+        assert r.params == {"pass_value": "5", "tolerance": 0.1}
+
+
+class TestSQLIntervalCheckOperatorBuildCheckResults:
+    @staticmethod
+    def _make_operator(metrics_thresholds, **kwargs):
+        return SQLIntervalCheckOperator(
+            task_id="test_task",
+            table="test_table",
+            metrics_thresholds=metrics_thresholds,
+            ratio_formula="max_over_min",
+            ignore_zero=True,
+            **kwargs,
+        )
+
+    def _make_all_tests_results(self, metric, current, past, threshold, ratio, 
success):
+        return {
+            metric: {
+                "metric": metric,
+                "current_metric": current,
+                "past_metric": past,
+                "threshold": threshold,
+                "ignore_zero": True,
+                "ratio": ratio,
+                "success": success,
+            }
+        }
+
+    def test_passing_metric(self):
+        op = self._make_operator({"f1": 1.5})
+        all_tests_results = self._make_all_tests_results("f1", 10, 9, 1.5, 
1.1, True)
+        results = op._build_check_results(all_tests_results)
+        assert len(results) == 1
+        r = results[0]
+        assert r.name == "interval_f1"
+        assert r.check_type == "accepted_range"
+        assert r.success is True
+        assert r.table == "test_table"
+        assert r.expected == "< 1.5"
+        assert r.actual == "1.1"
+        assert r.content == "max(10, 9) / min(10, 9)"
+        assert r.description == "Ratio of current metric to historical 
baseline must be below the threshold"
+        assert r.params == {
+            "threshold": 1.5,
+            "days_back": -7,
+            "ratio_formula_name": "max_over_min",
+            "ratio_formula": "max({current}, {past}) / min({current}, {past})",
+            "date_filter_column": "ds",
+            "ignore_zero": True,
+            "current_metric": 10,
+            "past_metric": 9,
+        }
+
+    def test_failing_metric(self):
+        op = self._make_operator({"f1": 1.5})
+        all_tests_results = self._make_all_tests_results("f1", 10, 2, 1.5, 
5.0, False)
+        results = op._build_check_results(all_tests_results)
+        assert len(results) == 1
+        assert results[0].success is False
+        assert results[0].actual == "5.0"
+
+    def test_zero_ratio_none(self):
+        op = self._make_operator({"f1": 1.5})
+        all_tests_results = self._make_all_tests_results("f1", 0, 10, 1.5, 
None, True)
+        results = op._build_check_results(all_tests_results)
+        assert len(results) == 1
+        assert results[0].actual == "0"
+        assert results[0].success is True
+
+    def test_multiple_metrics(self):
+        op = self._make_operator({"f1": 1.5, "f2": 2.0})
+        all_tests_results = {
+            "f1": {
+                "metric": "f1",
+                "current_metric": 10,
+                "past_metric": 9,
+                "threshold": 1.5,
+                "ignore_zero": True,
+                "ratio": 1.1,
+                "success": True,
+            },
+            "f2": {
+                "metric": "f2",
+                "current_metric": 10,
+                "past_metric": 2,
+                "threshold": 2.0,
+                "ignore_zero": True,
+                "ratio": 5.0,
+                "success": False,
+            },
+        }
+        results = op._build_check_results(all_tests_results)
+        assert len(results) == 2
+        assert results[0].name == "interval_f1"
+        assert results[0].success is True
+        assert results[1].name == "interval_f2"
+        assert results[1].success is False
+
+    def test_build_check_results_failure_returns_empty_list(self):
+        op = self._make_operator({"f1": 1.5})
+        all_tests_results = self._make_all_tests_results("f1", 10, 9, 1.5, 
1.1, True)
+        with mock.patch(
+            "airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom")
+        ):
+            results = op._build_check_results(all_tests_results)
+        assert results == []
+
+    @mock.patch("airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom"))
+    @mock.patch.object(SQLIntervalCheckOperator, "get_db_hook")
+    def test_execute_unaffected_when_build_check_results_raises(self, 
mock_hook, _):
+        mock_hook.return_value.get_first.side_effect = [[10], [10]]
+        op = self._make_operator({"f1": 1.5})
+        op.execute(MagicMock())
+        assert op.check_results == []
+
+    @mock.patch.object(SQLIntervalCheckOperator, "get_db_hook")
+    def test_execute_populates_check_results(self, mock_hook):
+        # execute() fetches sql2 (past/reference) first, then sql1 (current)
+        mock_hook.return_value.get_first.side_effect = [[9], [10]]
+        op = self._make_operator({"f1": 1.5})
+        op.execute(MagicMock())
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "interval_f1"
+        assert r.check_type == "accepted_range"
+        assert r.success is True
+        assert r.severity == "error"
+        assert r.column is None
+        assert r.table == "test_table"
+        assert r.expected == "< 1.5"
+        # ratio = max(10, 9) / min(10, 9) = 10/9 ≈ 1.111, content substitutes 
actual values
+        assert r.content == "max(10, 9) / min(10, 9)"
+        assert r.actual == str(10 / 9)
+        assert r.description == "Ratio of current metric to historical 
baseline must be below the threshold"
+        assert r.params == {
+            "threshold": 1.5,
+            "days_back": -7,
+            "ratio_formula_name": "max_over_min",
+            "ratio_formula": "max({current}, {past}) / min({current}, {past})",
+            "date_filter_column": "ds",
+            "ignore_zero": True,
+            "current_metric": 10,
+            "past_metric": 9,
+        }
+
+    def test_all_ratio_formulas_have_content_expressions(self):
+        """Fails if a new ratio formula is added to ratio_formulas without a 
matching expression entry."""
+        op = self._make_operator({"f1": 1.5})
+        assert set(op.ratio_formulas.keys()) == 
set(op.ratio_formula_expressions.keys())
+
+
+class TestSQLThresholdCheckOperatorBuildCheckResults:
+    @staticmethod
+    def _make_operator(min_threshold=1, max_threshold=100):
+        return SQLThresholdCheckOperator(
+            task_id="test_task",
+            sql="SELECT val FROM t",
+            min_threshold=min_threshold,
+            max_threshold=max_threshold,
+        )
+
+    def test_within_threshold(self):
+        op = self._make_operator(min_threshold=1, max_threshold=100)
+        meta_data = {"within_threshold": True, "min_threshold": 1.0, 
"max_threshold": 100.0}
+        results = op._build_check_results(50, meta_data)
+        assert len(results) == 1
+        r = results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "accepted_range"
+        assert r.success is True
+        assert r.expected == ">= 1.0, <= 100.0"
+        assert r.actual == "50"
+        assert r.content == "SELECT val FROM t"
+        assert r.description == "SQL result must fall within the configured 
bounds"
+        assert r.params == {"min_threshold": "1", "max_threshold": "100"}
+
+    def test_outside_threshold(self):
+        op = self._make_operator(min_threshold=20, max_threshold=100)
+        meta_data = {"within_threshold": False, "min_threshold": 20.0, 
"max_threshold": 100.0}
+        results = op._build_check_results(10, meta_data)
+        assert len(results) == 1
+        assert results[0].success is False
+        assert results[0].actual == "10"
+        assert results[0].expected == ">= 20.0, <= 100.0"
+
+    def test_sql_threshold_raw_strings_preserved_in_params(self):
+        op = self._make_operator(min_threshold="SELECT MIN(val) FROM ref", 
max_threshold=100)
+        meta_data = {"within_threshold": True, "min_threshold": 5.0, 
"max_threshold": 100.0}
+        results = op._build_check_results(50, meta_data)
+        assert len(results) == 1
+        assert results[0].params == {
+            "min_threshold": "SELECT MIN(val) FROM ref",
+            "max_threshold": "100",
+        }
+
+    def test_build_check_results_failure_returns_empty_list(self):
+        op = self._make_operator()
+        meta_data = {"within_threshold": True, "min_threshold": 1.0, 
"max_threshold": 100.0}
+        with mock.patch(
+            "airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom")
+        ):
+            results = op._build_check_results(50, meta_data)
+        assert results == []
+
+    @mock.patch("airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom"))
+    @mock.patch.object(SQLThresholdCheckOperator, "get_db_hook")
+    def test_execute_unaffected_when_build_check_results_raises(self, 
mock_hook, _):
+        mock_hook.return_value.get_first.return_value = (50,)
+        op = self._make_operator(min_threshold=1, max_threshold=100)
+        op.execute(MagicMock())
+        assert op.check_results == []
+
+    @mock.patch.object(SQLThresholdCheckOperator, "get_db_hook")
+    def test_execute_populates_check_results(self, mock_hook):
+        mock_hook.return_value.get_first.return_value = (50,)
+        op = self._make_operator(min_threshold=1, max_threshold=100)
+        op.execute(MagicMock())
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "accepted_range"
+        assert r.success is True
+        assert r.severity == "error"
+        assert r.column is None
+        assert r.table is None
+        assert r.expected == ">= 1.0, <= 100.0"
+        assert r.actual == "50"
+        assert r.content == "SELECT val FROM t"
+        assert r.description == "SQL result must fall within the configured 
bounds"
+        assert r.params == {"min_threshold": "1", "max_threshold": "100"}
+
+
+class TestBranchSQLOperatorBuildCheckResults:
+    @staticmethod
+    def _make_operator(**kwargs):
+        return BranchSQLOperator(
+            task_id="test_task",
+            sql="SELECT 1",
+            follow_task_ids_if_true=["branch_true"],
+            follow_task_ids_if_false=["branch_false"],
+            **kwargs,
+        )
+
+    def test_true_branch(self):
+        op = self._make_operator()
+        op.follow_branch = ["branch_true"]
+        results = op._build_check_results(1)
+        assert len(results) == 1
+        r = results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "expression_is_true"
+        assert r.success is True
+        assert r.severity == "info"
+        assert r.expected == "truthy"
+        assert r.actual == "1"
+        assert r.content == "SELECT 1"
+        assert r.description == "SQL result is evaluated as boolean to 
determine the execution branch"
+        assert r.params == {
+            "follow_task_ids_if_true": ["branch_true"],
+            "follow_task_ids_if_false": ["branch_false"],
+            "follow_branch": ["branch_true"],
+        }
+
+    def test_false_branch(self):
+        op = self._make_operator()
+        op.follow_branch = ["branch_false"]
+        results = op._build_check_results(0)
+        assert len(results) == 1
+        r = results[0]
+        assert r.success is False
+        assert r.actual == "0"
+        assert r.params == {
+            "follow_task_ids_if_true": ["branch_true"],
+            "follow_task_ids_if_false": ["branch_false"],
+            "follow_branch": ["branch_false"],
+        }
+
+    def test_parameters_included_when_set(self):
+        op = self._make_operator(parameters={"key": "val"})
+        op.follow_branch = ["branch_true"]
+        results = op._build_check_results(1)
+        assert len(results) == 1
+        assert results[0].params == {
+            "follow_task_ids_if_true": ["branch_true"],
+            "follow_task_ids_if_false": ["branch_false"],
+            "follow_branch": ["branch_true"],
+            "parameters": {"key": "val"},
+        }
+
+    def test_build_check_results_failure_returns_empty_list(self):
+        op = self._make_operator()
+        op.follow_branch = ["branch_true"]
+        with mock.patch(
+            "airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom")
+        ):
+            results = op._build_check_results(1)
+        assert results == []
+
+    @mock.patch.object(BranchSQLOperator, "skip_all_except")
+    @mock.patch("airflow.providers.common.sql.operators.sql.SQLCheckResult", 
side_effect=RuntimeError("boom"))
+    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseSQLOperator.get_db_hook")
+    def test_execute_unaffected_when_build_check_results_raises(self, 
mock_hook, _, mock_skip):
+        mock_hook.return_value.get_first.return_value = 1
+        op = self._make_operator()
+        op.execute({"ti": MagicMock()})
+        assert op.check_results == []
+        mock_skip.assert_called_once()
+
+    @mock.patch.object(BranchSQLOperator, "skip_all_except")
+    
@mock.patch("airflow.providers.common.sql.operators.sql.BaseSQLOperator.get_db_hook")
+    def test_execute_populates_check_results(self, mock_hook, mock_skip):
+        mock_hook.return_value.get_first.return_value = 1
+        op = self._make_operator()
+        op.execute({"ti": MagicMock()})
+        assert len(op.check_results) == 1
+        r = op.check_results[0]
+        assert r.name == "test_task"
+        assert r.check_type == "expression_is_true"
+        assert r.success is True
+        assert r.severity == "info"
+        assert r.column is None
+        assert r.table is None
+        assert r.expected == "truthy"
+        assert r.actual == "1"
+        assert r.content == "SELECT 1"
+        assert r.description == "SQL result is evaluated as boolean to 
determine the execution branch"
+        assert r.params == {
+            "follow_task_ids_if_true": ["branch_true"],
+            "follow_task_ids_if_false": ["branch_false"],
+            "follow_branch": ["branch_true"],
+        }
+        mock_skip.assert_called_once()
+
+
+class TestSqlBaseOperatorAttachCheckFacets:
+    """Tests for BaseSQLOperator._attach_check_facets."""
+
+    @staticmethod
+    def _make_operator():
+        return SQLCheckOperator(task_id="test_task", sql="SELECT 1")
+
+    @staticmethod
+    def _dataset(name):
+        from openlineage.client.event_v2 import Dataset
+
+        return Dataset(namespace="default", name=name)
+
+    def test_empty_check_results_returns_lineage_unchanged(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        result = op._attach_check_facets(OperatorLineage())
+        assert result == OperatorLineage()
+        assert result.run_facets == {}
+
+    def test_old_openlineage_client_raises_optional_feature_exception(self):
+        try:
+            from airflow.providers.openlineage.extractors import 
OperatorLineage
+        except ImportError:
+            pytest.skip("openlineage provider not installed")
+        from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+        op = self._make_operator()
+        op.check_results = [SQLCheckResult(name="t", check_type="expr", 
success=True)]
+        with mock.patch(
+            
"airflow.providers.common.compat.openlineage.check.metadata.version", 
return_value="1.46.0"
+        ):
+            with pytest.raises(AirflowOptionalProviderFeatureException):
+                op._attach_check_facets(OperatorLineage())
+
+    def test_run_facet_all_fields_populated(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            SQLCheckResult(
+                name="my_check",
+                check_type="expression_is_true",
+                success=True,
+                severity="warn",
+                column="col_a",
+                table=None,  # no table → run facet
+                expected="all truthy",
+                actual="[1, 1]",
+                content="SELECT col_a FROM t",
+                description="All values must be truthy",
+                params={"custom_key": "custom_val"},
+            )
+        ]
+        result = op._attach_check_facets(OperatorLineage())
+
+        facet = result.run_facets.get("test")
+        assert facet is not None
+        assert len(facet.tests) == 1
+        t = facet.tests[0]
+        assert t.name == "my_check"
+        assert t.status == "pass"
+        assert t.severity == "warn"
+        assert t.type == "expression_is_true"
+        assert t.description == "All values must be truthy"
+        assert t.expected == "all truthy"
+        assert t.actual == "[1, 1]"
+        assert t.content == "SELECT col_a FROM t"
+        assert t.contentType == "sql"
+        # params merges tested_column / tested_table with the SQLCheckResult 
params
+        assert t.params == {
+            "tested_column": "col_a",
+            "tested_table": None,
+            "custom_key": "custom_val",
+        }
+
+    def test_run_facet_failing_check_has_fail_status(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [SQLCheckResult(name="bad_check", 
check_type="not_null", success=False)]
+        result = op._attach_check_facets(OperatorLineage())
+
+        facet = result.run_facets.get("test")
+        assert facet is not None
+        assert facet.tests[0].status == "fail"
+
+    def test_dataset_facet_all_fields_populated(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            SQLCheckResult(
+                name="col.null_check",
+                check_type="not_null",
+                success=True,
+                severity="error",
+                column="col",
+                table="test_table",
+                expected="0 nulls",
+                actual="0",
+                content="SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)",
+                description="Column must contain no nulls",
+                params={"accept_none": False},
+            )
+        ]
+        lineage = 
OperatorLineage(inputs=[self._dataset("myschema.test_table")])
+        result = op._attach_check_facets(lineage)
+
+        assert result.run_facets == {}
+        dq_facet = result.inputs[0].facets.get("dataQualityAssertions")
+        assert dq_facet is not None
+        assert len(dq_facet.assertions) == 1
+        a = dq_facet.assertions[0]
+        assert a.assertion == "not_null"
+        assert a.success is True
+        assert a.severity == "error"
+        assert a.column == "col"
+        assert a.name == "col.null_check"
+        assert a.description == "Column must contain no nulls"
+        assert a.expected == "0 nulls"
+        assert a.actual == "0"
+        assert a.content == "SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)"
+        assert a.contentType == "sql"
+        assert a.params == {"accept_none": False}
+
+    def test_dataset_facet_multiple_assertions_same_table(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            SQLCheckResult(
+                name="col_a.null_check",
+                check_type="not_null",
+                success=True,
+                column="col_a",
+                table="test_table",
+                expected="0 nulls",
+                actual="0",
+            ),
+            SQLCheckResult(
+                name="col_b.unique_check",
+                check_type="unique",
+                success=False,
+                column="col_b",
+                table="test_table",
+                expected="0 duplicates",
+                actual="3",
+            ),
+            SQLCheckResult(
+                name="row_count_check",
+                check_type="expression_is_true",
+                success=True,
+                column=None,
+                table="test_table",
+                expected="all truthy",
+                actual="1",
+                content="COUNT(*) > 0",
+            ),
+        ]
+        lineage = 
OperatorLineage(inputs=[self._dataset("myschema.test_table")])
+        result = op._attach_check_facets(lineage)
+
+        assert result.run_facets == {}
+        dq_facet = result.inputs[0].facets.get("dataQualityAssertions")
+        assert dq_facet is not None
+        assert len(dq_facet.assertions) == 3
+
+        a0 = dq_facet.assertions[0]
+        assert a0.name == "col_a.null_check"
+        assert a0.assertion == "not_null"
+        assert a0.success is True
+        assert a0.column == "col_a"
+        assert a0.expected == "0 nulls"
+        assert a0.actual == "0"
+
+        a1 = dq_facet.assertions[1]
+        assert a1.name == "col_b.unique_check"
+        assert a1.assertion == "unique"
+        assert a1.success is False
+        assert a1.column == "col_b"
+        assert a1.expected == "0 duplicates"
+        assert a1.actual == "3"
+
+        a2 = dq_facet.assertions[2]
+        assert a2.name == "row_count_check"
+        assert a2.assertion == "expression_is_true"
+        assert a2.success is True
+        assert a2.column is None
+        assert a2.content == "COUNT(*) > 0"
+
+    def test_dataset_facet_attached_to_output_all_fields_populated(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            SQLCheckResult(
+                name="col.null_check",
+                check_type="not_null",
+                success=True,
+                severity="error",
+                column="col",
+                table="target_table",
+                expected="0 nulls",
+                actual="0",
+                content="SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)",
+                description="Column must contain no nulls",
+                params={"accept_none": False},
+            )
+        ]
+        # table appears only in outputs (e.g. a post-write quality check)
+        lineage = 
OperatorLineage(outputs=[self._dataset("myschema.target_table")])
+        result = op._attach_check_facets(lineage)
+
+        assert result.run_facets == {}
+        dq_facet = result.outputs[0].facets.get("dataQualityAssertions")
+        assert dq_facet is not None
+        assert len(dq_facet.assertions) == 1
+        a = dq_facet.assertions[0]
+        assert a.assertion == "not_null"
+        assert a.success is True
+        assert a.severity == "error"
+        assert a.column == "col"
+        assert a.name == "col.null_check"
+        assert a.description == "Column must contain no nulls"
+        assert a.expected == "0 nulls"
+        assert a.actual == "0"
+        assert a.content == "SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)"
+        assert a.contentType == "sql"
+        assert a.params == {"accept_none": False}
+
+    def 
test_unmatched_table_falls_back_to_run_facet_with_table_context_in_params(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            SQLCheckResult(
+                name="col.null_check",
+                check_type="not_null",
+                success=False,
+                severity="error",
+                column="col",
+                table="other_table",
+                expected="0 nulls",
+                actual="5",
+                content="SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)",
+                description="Column must contain no nulls",
+                params={"accept_none": False},
+            )
+        ]
+        lineage = 
OperatorLineage(inputs=[self._dataset("myschema.test_table")])
+        result = op._attach_check_facets(lineage)
+
+        assert result.inputs[0].facets == {}
+        facet = result.run_facets.get("test")
+        assert facet is not None
+        assert len(facet.tests) == 1
+        t = facet.tests[0]
+        assert t.name == "col.null_check"
+        assert t.status == "fail"
+        assert t.severity == "error"
+        assert t.type == "not_null"
+        assert t.description == "Column must contain no nulls"
+        assert t.expected == "0 nulls"
+        assert t.actual == "5"
+        assert t.content == "SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)"
+        assert t.contentType == "sql"
+        # tested_column / tested_table are injected alongside the 
SQLCheckResult params
+        assert t.params == {
+            "tested_column": "col",
+            "tested_table": "other_table",
+            "accept_none": False,
+        }
+
+    def test_exact_match_preferred_over_suffix_match(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            SQLCheckResult(
+                name="col.null_check",
+                check_type="not_null",
+                success=True,
+                column="col",
+                table="orders",
+            )
+        ]
+        suffix_match = self._dataset("schema.orders")  # endswith match — 
listed first
+        exact_match = self._dataset("orders")  # exact match — listed second
+        lineage = OperatorLineage(inputs=[suffix_match, exact_match])
+        result = op._attach_check_facets(lineage)
+
+        assert result.run_facets == {}
+        # confirm ordering is preserved so index assertions below are 
unambiguous
+        assert result.inputs[0].name == "schema.orders"
+        assert result.inputs[1].name == "orders"
+        # suffix-match dataset must not receive the facet
+        assert result.inputs[0].facets == {}
+        # exact-match dataset must receive it
+        dq_facet = result.inputs[1].facets.get("dataQualityAssertions")
+        assert dq_facet is not None
+        assert dq_facet.assertions[0].name == "col.null_check"
+
+    def test_same_table_in_inputs_and_outputs_both_receive_assertions(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            SQLCheckResult(
+                name="col.null_check",
+                check_type="not_null",
+                success=True,
+                column="col",
+                table="orders",
+                expected="0 nulls",
+                actual="0",
+            )
+        ]
+        input_ds = self._dataset("schema.orders")
+        output_ds = self._dataset("schema.orders")
+        lineage = OperatorLineage(inputs=[input_ds], outputs=[output_ds])
+        result = op._attach_check_facets(lineage)
+
+        assert result.run_facets == {}
+        assert len(result.inputs) == 1
+        assert len(result.outputs) == 1
+
+        for ds in (result.inputs[0], result.outputs[0]):
+            dq_facet = ds.facets.get("dataQualityAssertions")
+            assert dq_facet is not None, f"missing facet on {ds.name}"
+            assert len(dq_facet.assertions) == 1
+            a = dq_facet.assertions[0]
+            assert a.name == "col.null_check"
+            assert a.assertion == "not_null"
+            assert a.success is True
+            assert a.column == "col"
+
+    def 
test_mixed_results_dataset_facet_and_run_facet_populated_correctly(self):
+        pytest.importorskip(
+            "openlineage.client", minversion="1.47.0", 
reason="openlineage-python >= 1.47.0 required"
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        op = self._make_operator()
+        op.check_results = [
+            # matched input dataset → DataQualityAssertionsDatasetFacet
+            SQLCheckResult(
+                name="col_a.null_check",
+                check_type="not_null",
+                success=True,
+                column="col_a",
+                table="known_table",
+                expected="0 nulls",
+                actual="0",
+            ),
+            # table not in any dataset → falls back to TestRunFacet
+            SQLCheckResult(
+                name="col_b.null_check",
+                check_type="not_null",
+                success=False,
+                column="col_b",
+                table="unknown_table",
+                expected="0 nulls",
+                actual="2",
+            ),
+            # no table at all → directly to TestRunFacet
+            SQLCheckResult(
+                name="row_count_check",
+                check_type="expression_is_true",
+                success=True,
+                table=None,
+                expected="all truthy",
+                actual="1",
+                content="COUNT(*) > 0",
+            ),
+        ]
+        lineage = 
OperatorLineage(inputs=[self._dataset("myschema.known_table")])
+        result = op._attach_check_facets(lineage)
+
+        # matched check lands on the input dataset facet
+        dq_facet = result.inputs[0].facets.get("dataQualityAssertions")
+        assert dq_facet is not None
+        assert len(dq_facet.assertions) == 1
+        a = dq_facet.assertions[0]
+        assert a.name == "col_a.null_check"
+        assert a.assertion == "not_null"
+        assert a.success is True
+        assert a.column == "col_a"
+
+        # unmatched-table check and no-table check both land in the run facet
+        run_facet = result.run_facets.get("test")
+        assert run_facet is not None
+        assert len(run_facet.tests) == 2
+
+        t0 = run_facet.tests[0]
+        assert t0.name == "row_count_check"
+        assert t0.status == "pass"
+        assert t0.params["tested_column"] is None
+        assert t0.params["tested_table"] is None
+
+        t1 = run_facet.tests[1]
+        assert t1.name == "col_b.null_check"
+        assert t1.status == "fail"
+        assert t1.params["tested_column"] == "col_b"
+        assert t1.params["tested_table"] == "unknown_table"
+
+
 class TestSQLInsertRowsOperator:
     @mock.patch.object(SQLInsertRowsOperator, "get_db_hook")
     def test_insert_rows_operator_with_preoperator(self, mock_get_db_hook):
diff --git a/providers/openlineage/docs/index.rst 
b/providers/openlineage/docs/index.rst
index 2a748dd3269..acf88d949a4 100644
--- a/providers/openlineage/docs/index.rst
+++ b/providers/openlineage/docs/index.rst
@@ -110,8 +110,8 @@ PIP package                                 Version required
 ``apache-airflow-providers-common-sql``     ``>=1.32.0``
 ``apache-airflow-providers-common-compat``  ``>=1.14.3``
 ``attrs``                                   ``>=22.2``
-``openlineage-integration-common``          ``>=1.46.0``
-``openlineage-python``                      ``>=1.46.0``
+``openlineage-integration-common``          ``>=1.47.0``
+``openlineage-python``                      ``>=1.47.0``
 ==========================================  ==================
 
 Cross provider package dependencies
diff --git a/providers/openlineage/pyproject.toml 
b/providers/openlineage/pyproject.toml
index bfd1eaee1e5..7d5d799889c 100644
--- a/providers/openlineage/pyproject.toml
+++ b/providers/openlineage/pyproject.toml
@@ -63,8 +63,8 @@ dependencies = [
     "apache-airflow-providers-common-sql>=1.32.0",
     "apache-airflow-providers-common-compat>=1.14.3",  # use next version
     "attrs>=22.2",
-    "openlineage-integration-common>=1.46.0",
-    "openlineage-python>=1.46.0",
+    "openlineage-integration-common>=1.47.0",
+    "openlineage-python>=1.47.0",
 ]
 
 # The optional dependencies should be modified in place in the generated file
diff --git a/uv.lock b/uv.lock
index 46ff48ab046..4066b76d5c0 100644
--- a/uv.lock
+++ b/uv.lock
@@ -6392,8 +6392,8 @@ requires-dist = [
     { name = "apache-airflow-providers-common-compat", editable = 
"providers/common/compat" },
     { name = "apache-airflow-providers-common-sql", editable = 
"providers/common/sql" },
     { name = "attrs", specifier = ">=22.2" },
-    { name = "openlineage-integration-common", specifier = ">=1.46.0" },
-    { name = "openlineage-python", specifier = ">=1.46.0" },
+    { name = "openlineage-integration-common", specifier = ">=1.47.0" },
+    { name = "openlineage-python", specifier = ">=1.47.0" },
     { name = "sqlalchemy", marker = "extra == 'sqlalchemy'", specifier = 
">=1.4.54" },
 ]
 provides-extras = ["sqlalchemy"]

Reply via email to