kacpermuda commented on code in PR #39614:
URL: https://github.com/apache/airflow/pull/39614#discussion_r1600097695
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
+ Job facets should contain:
+ - SqlJobFacet if operator has self.sql
+
+ Input datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+
+ Output datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+ - OutputStatisticsOutputDatasetFacet
+ """
+ from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.job_id:
+ return OperatorLineage()
+
+ run_facets: dict[str, BaseFacet] = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+ }
+
+ job_facets = {"sql":
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+ self.client = self.hook.get_client(project_id=self.hook.project_id)
+ job_ids = self.job_id
+ if isinstance(self.job_id, str):
+ job_ids = [self.job_id]
+ inputs, outputs = [], []
+ for job_id in job_ids:
+ inner_inputs, inner_outputs, inner_run_facets =
self.get_facets(job_id=job_id)
+ inputs.extend(inner_inputs)
+ outputs.extend(inner_outputs)
+ run_facets.update(inner_run_facets)
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ run_facets=run_facets,
+ job_facets=job_facets,
+ )
+
+ def get_facets(self, job_id: str):
+ inputs = []
+ outputs = []
+ run_facets: dict[str, BaseFacet] = {}
+ if hasattr(self, "log"):
+ self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+ try:
+ job = self.client.get_job(job_id=job_id) # type: ignore
+ props = job._properties
+
+ if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+ raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
+
+ run_facets["bigQuery_job"] =
self._get_bigquery_job_run_facet(props)
+
+ if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
+ if hasattr(self, "log"):
+ self.log.debug("Found SCRIPT job. Extracting lineage from
child jobs instead.")
+ # SCRIPT job type has no input / output information but spawns
child jobs that have one
+ #
https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job
+ for child_job_id in self.client.list_jobs(parent_job=job_id):
+ child_job = self.client.get_job(job_id=child_job_id) #
type: ignore
+ child_inputs, child_output =
self._get_inputs_outputs_from_job(child_job._properties)
+ inputs.extend(child_inputs)
+ outputs.append(child_output)
+ else:
+ inputs, _output = self._get_inputs_outputs_from_job(props)
+ outputs.append(_output)
+ except Exception as e:
+ if hasattr(self, "log"):
+ self.log.warning("Cannot retrieve job details from
BigQuery.Client. %s", e, exc_info=True)
+ run_facets.update(
+ {
+ "errorMessage": ErrorMessageRunFacet(
Review Comment:
Not sure as well. F.e. for parent facet we sent both old and new key for a
while, but at the end some people can still be surprised. Maybe we could
somehow add an information (some deprecation message) to the facet itself
(BaseFacet f.e.) about the facet being deprecated so that the consumer is
notified about it? When it appears, the consumer would know that this facet
should not be used and look for a substitute. That's probably something we can
discuss in an issue and leave this as it is for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]