JDarDagran commented on code in PR #39614:
URL: https://github.com/apache/airflow/pull/39614#discussion_r1600010667


##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
         }
     )
     return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+    """Facet that represents relevant statistics of bigquery run.
+
+    This facet is used to provide statistics about bigquery run.
+
+    :param cached: BigQuery caches query results. Rest of the statistics will 
not be provided for cached queries.
+    :param billedBytes: How many bytes BigQuery bills for.
+    :param properties: Full property tree of BigQUery run.
+    """
+
+    cached: bool
+    billedBytes: int | None = field(default=None)
+    properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+    """Get object from nested structure of objects, where it's not guaranteed 
that all keys in the nested structure exist.
+
+    Intended to replace chain of `dict.get()` statements.
+
+    Example usage:
+
+    .. code-block:: python
+
+        if (
+            not job._properties.get("statistics")
+            or not job._properties.get("statistics").get("query")
+            or not 
job._properties.get("statistics").get("query").get("referencedTables")
+        ):
+            return None
+        result = 
job._properties.get("statistics").get("query").get("referencedTables")
+
+    becomes:
+
+    .. code-block:: python
+
+        result = get_from_nullable_chain(properties, ["statistics", "query", 
"queryPlan"])
+        if not result:
+            return None
+    """
+    chain.reverse()
+    try:
+        while chain:
+            next_key = chain.pop()
+            if isinstance(source, dict):
+                source = source.get(next_key)
+            else:
+                source = getattr(source, next_key)
+        return source
+    except AttributeError:
+        return None
+
+
+class _BigQueryOpenLineageMixin:
+    def get_openlineage_facets_on_complete(self, _):
+        """
+        Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+        This method retrieves statistics for the specified job_ids using the 
BigQueryDatasetsProvider.
+        It calls BigQuery API, retrieving input and output dataset info from 
it, as well as run-level
+        usage statistics.
+
+        Run facets should contain:
+            - ExternalQueryRunFacet
+            - BigQueryJobRunFacet
+
+        Job facets should contain:
+            - SqlJobFacet if operator has self.sql
+
+        Input datasets should contain facets:
+            - DataSourceDatasetFacet
+            - SchemaDatasetFacet
+
+        Output datasets should contain facets:
+            - DataSourceDatasetFacet
+            - SchemaDatasetFacet
+            - OutputStatisticsOutputDatasetFacet
+        """
+        from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+        from airflow.providers.openlineage.extractors import OperatorLineage
+        from airflow.providers.openlineage.sqlparser import SQLParser
+
+        if not self.job_id:
+            return OperatorLineage()
+
+        run_facets: dict[str, BaseFacet] = {
+            "externalQuery": 
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+        }
+
+        job_facets = {"sql": 
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+        self.client = self.hook.get_client(project_id=self.hook.project_id)
+        job_ids = self.job_id
+        if isinstance(self.job_id, str):
+            job_ids = [self.job_id]
+        inputs, outputs = [], []
+        for job_id in job_ids:
+            inner_inputs, inner_outputs, inner_run_facets = 
self.get_facets(job_id=job_id)
+            inputs.extend(inner_inputs)
+            outputs.extend(inner_outputs)
+            run_facets.update(inner_run_facets)
+
+        return OperatorLineage(
+            inputs=inputs,
+            outputs=outputs,
+            run_facets=run_facets,
+            job_facets=job_facets,
+        )
+
+    def get_facets(self, job_id: str):
+        inputs = []
+        outputs = []
+        run_facets: dict[str, BaseFacet] = {}
+        if hasattr(self, "log"):
+            self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+        try:
+            job = self.client.get_job(job_id=job_id)  # type: ignore
+            props = job._properties
+
+            if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+                raise ValueError(f"Trying to extract data from running 
bigquery job: `{job_id}`")
+
+            run_facets["bigQuery_job"] = 
self._get_bigquery_job_run_facet(props)

Review Comment:
   I agree, it's more of matter how to deal with the changes/deprecations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to