phanikumv commented on code in PR #66849:
URL: https://github.com/apache/airflow/pull/66849#discussion_r3234487499
##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -300,14 +344,109 @@ def get_openlineage_facets_on_complete(self,
task_instance) -> OperatorLineage |
database_specific_lineage = None
if database_specific_lineage is None:
- return operator_lineage
+ if not self._check_results:
+ return operator_lineage
+ try:
+ return self._attach_check_facets(operator_lineage)
+ except AirflowOptionalProviderFeatureException as err:
+ self.log.debug("OpenLineage could not attach check facets:
%s", err)
+ return operator_lineage
- return OperatorLineage(
+ merged = OperatorLineage(
inputs=operator_lineage.inputs + database_specific_lineage.inputs,
outputs=operator_lineage.outputs +
database_specific_lineage.outputs,
run_facets=merge_dicts(operator_lineage.run_facets,
database_specific_lineage.run_facets),
job_facets=merge_dicts(operator_lineage.job_facets,
database_specific_lineage.job_facets),
)
+ if not self._check_results:
+ return merged
+ try:
+ return self._attach_check_facets(merged)
+ except AirflowOptionalProviderFeatureException as err:
+ self.log.debug("OpenLineage could not attach check facets: %s",
err)
+ return merged
+
+ @require_openlineage_version(client_min_version="1.47.0")
+ def _attach_check_facets(self, operator_lineage: OperatorLineage) ->
OperatorLineage:
+ """
+ Attach OpenLineage check-result facets to the given lineage object.
+
+ Requires openlineage-python >= 1.47.0, which introduced the extended
``Assertion``
+ and ``TestExecution`` schemas (``name``, ``description``,
``expected``, ``actual``, ``content``,
+ ``params`` fields). The decorator raises
``AirflowOptionalProviderFeatureException`` when
+ the client is absent or too old; callers are expected to catch that
exception.
+
+ Results with a ``table`` set are attached
as``DataQualityAssertionsDatasetFacet`` on the matching
+ dataset (matched by suffix). Unmatched results, and results without a
table, fall back
+ to a run-level ``TestRunFacet``.
+ """
+ from openlineage.client.facet_v2 import
data_quality_assertions_dataset, test_run
+
+ by_table: dict[str | None, list[SQLCheckResult]] = {}
+ for r in self._check_results:
+ by_table.setdefault(r.table, []).append(r)
+
+ run_level: list[SQLCheckResult] = list(by_table.pop(None, []))
+
+ for table, results in by_table.items():
+ assertions = [
+ data_quality_assertions_dataset.Assertion(
+ assertion=r.check_type,
+ success=r.success,
+ severity=r.severity,
+ column=r.column,
+ name=r.name,
+ description=r.description,
+ expected=r.expected,
+ actual=r.actual,
+ content=r.content,
+ contentType="sql",
+ params=r.params,
+ )
+ for r in results
+ ]
+ matched = False
+ for dataset in [*operator_lineage.inputs,
*operator_lineage.outputs]:
Review Comment:
`_attach_check_facets` matches only the first dataset. Is this expected? If
the same table appears in both inputs and outputs (e.g. read-then-write), or
two inputs from different namespaces share a suffix, only the first gets the
assertion and the others silently don't
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]