kacpermuda commented on code in PR #66849:
URL: https://github.com/apache/airflow/pull/66849#discussion_r3234781625
##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -300,14 +344,109 @@ def get_openlineage_facets_on_complete(self,
task_instance) -> OperatorLineage |
database_specific_lineage = None
if database_specific_lineage is None:
- return operator_lineage
+ if not self._check_results:
+ return operator_lineage
+ try:
+ return self._attach_check_facets(operator_lineage)
+ except AirflowOptionalProviderFeatureException as err:
+ self.log.debug("OpenLineage could not attach check facets:
%s", err)
+ return operator_lineage
- return OperatorLineage(
+ merged = OperatorLineage(
inputs=operator_lineage.inputs + database_specific_lineage.inputs,
outputs=operator_lineage.outputs +
database_specific_lineage.outputs,
run_facets=merge_dicts(operator_lineage.run_facets,
database_specific_lineage.run_facets),
job_facets=merge_dicts(operator_lineage.job_facets,
database_specific_lineage.job_facets),
)
+ if not self._check_results:
+ return merged
+ try:
+ return self._attach_check_facets(merged)
+ except AirflowOptionalProviderFeatureException as err:
+ self.log.debug("OpenLineage could not attach check facets: %s",
err)
+ return merged
+
+ @require_openlineage_version(client_min_version="1.47.0")
+ def _attach_check_facets(self, operator_lineage: OperatorLineage) ->
OperatorLineage:
+ """
+ Attach OpenLineage check-result facets to the given lineage object.
+
+ Requires openlineage-python >= 1.47.0, which introduced the extended
``Assertion``
+ and ``TestExecution`` schemas (``name``, ``description``,
``expected``, ``actual``, ``content``,
+ ``params`` fields). The decorator raises
``AirflowOptionalProviderFeatureException`` when
+ the client is absent or too old; callers are expected to catch that
exception.
+
+ Results with a ``table`` set are attached
as``DataQualityAssertionsDatasetFacet`` on the matching
+ dataset (matched by suffix). Unmatched results, and results without a
table, fall back
+ to a run-level ``TestRunFacet``.
+ """
+ from openlineage.client.facet_v2 import
data_quality_assertions_dataset, test_run
+
+ by_table: dict[str | None, list[SQLCheckResult]] = {}
+ for r in self._check_results:
+ by_table.setdefault(r.table, []).append(r)
+
+ run_level: list[SQLCheckResult] = list(by_table.pop(None, []))
+
+ for table, results in by_table.items():
+ assertions = [
+ data_quality_assertions_dataset.Assertion(
+ assertion=r.check_type,
+ success=r.success,
+ severity=r.severity,
+ column=r.column,
+ name=r.name,
+ description=r.description,
+ expected=r.expected,
+ actual=r.actual,
+ content=r.content,
+ contentType="sql",
+ params=r.params,
+ )
+ for r in results
+ ]
+ matched = False
+ for dataset in [*operator_lineage.inputs,
*operator_lineage.outputs]:
Review Comment:
Not sure if this scenario is possible in any of the supported operators
currently (most check operators just check, never write to the table, but the
OL implementation here can be future proofed, so it's definitely a good idea to
think about it)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]