kacpermuda commented on code in PR #39614:
URL: https://github.com/apache/airflow/pull/39614#discussion_r1599878088
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -15,24 +15,37 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""This module contains code related to OpenLineage and lineage extraction."""
-
from __future__ import annotations
+import copy
+import json
+import traceback
from typing import TYPE_CHECKING, Any
+from attr import define, field
from openlineage.client.facet import (
+ BaseFacet,
ColumnLineageDatasetFacet,
ColumnLineageDatasetFacetFieldsAdditional,
ColumnLineageDatasetFacetFieldsAdditionalInputFields,
DocumentationDatasetFacet,
+ ErrorMessageRunFacet,
+ OutputStatisticsOutputDatasetFacet,
SchemaDatasetFacet,
SchemaField,
)
+from openlineage.client.run import Dataset
if TYPE_CHECKING:
from google.cloud.bigquery.table import Table
- from openlineage.client.run import Dataset
+
+
+GITHUB_LOCATION = (
+
"https://raw.githubusercontent.com/apache/airflow/airflow/providers/google/cloud/utils/openli"
Review Comment:
Is this URL correct? Looks like it's missing an ending. Where is it used?
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
+ Job facets should contain:
+ - SqlJobFacet if operator has self.sql
+
+ Input datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+
+ Output datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+ - OutputStatisticsOutputDatasetFacet
+ """
+ from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.job_id:
+ return OperatorLineage()
+
+ run_facets: dict[str, BaseFacet] = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+ }
+
+ job_facets = {"sql":
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+ self.client = self.hook.get_client(project_id=self.hook.project_id)
+ job_ids = self.job_id
+ if isinstance(self.job_id, str):
+ job_ids = [self.job_id]
+ inputs, outputs = [], []
+ for job_id in job_ids:
+ inner_inputs, inner_outputs, inner_run_facets =
self.get_facets(job_id=job_id)
+ inputs.extend(inner_inputs)
+ outputs.extend(inner_outputs)
+ run_facets.update(inner_run_facets)
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ run_facets=run_facets,
+ job_facets=job_facets,
+ )
+
+ def get_facets(self, job_id: str):
+ inputs = []
+ outputs = []
+ run_facets: dict[str, BaseFacet] = {}
+ if hasattr(self, "log"):
+ self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+ try:
+ job = self.client.get_job(job_id=job_id) # type: ignore
+ props = job._properties
+
+ if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+ raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
+
+ run_facets["bigQuery_job"] =
self._get_bigquery_job_run_facet(props)
+
+ if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
+ if hasattr(self, "log"):
+ self.log.debug("Found SCRIPT job. Extracting lineage from
child jobs instead.")
+ # SCRIPT job type has no input / output information but spawns
child jobs that have one
+ #
https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job
+ for child_job_id in self.client.list_jobs(parent_job=job_id):
+ child_job = self.client.get_job(job_id=child_job_id) #
type: ignore
+ child_inputs, child_output =
self._get_inputs_outputs_from_job(child_job._properties)
+ inputs.extend(child_inputs)
+ outputs.append(child_output)
+ else:
+ inputs, _output = self._get_inputs_outputs_from_job(props)
+ outputs.append(_output)
+ except Exception as e:
+ if hasattr(self, "log"):
+ self.log.warning("Cannot retrieve job details from
BigQuery.Client. %s", e, exc_info=True)
+ run_facets.update(
+ {
+ "errorMessage": ErrorMessageRunFacet(
Review Comment:
Facet key will change from `bigQuery_error` to `errorMessage`, and the facet
itself has different keys. I'm all in for using `ErrorMessageRunFacet` instead
of `BigQueryErrorRunFacet` that's now in openlineage-common, but maybe we
should mention it somewhere? We are making a change without any deprecations,
and that may not be wrong (so feel free to ignore this), but just bringing our
attention to this to make sure it's desired.
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
+ Job facets should contain:
+ - SqlJobFacet if operator has self.sql
+
+ Input datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+
+ Output datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+ - OutputStatisticsOutputDatasetFacet
+ """
+ from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.job_id:
+ return OperatorLineage()
+
+ run_facets: dict[str, BaseFacet] = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+ }
+
+ job_facets = {"sql":
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+ self.client = self.hook.get_client(project_id=self.hook.project_id)
+ job_ids = self.job_id
+ if isinstance(self.job_id, str):
+ job_ids = [self.job_id]
+ inputs, outputs = [], []
+ for job_id in job_ids:
+ inner_inputs, inner_outputs, inner_run_facets =
self.get_facets(job_id=job_id)
+ inputs.extend(inner_inputs)
+ outputs.extend(inner_outputs)
+ run_facets.update(inner_run_facets)
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ run_facets=run_facets,
+ job_facets=job_facets,
+ )
+
+ def get_facets(self, job_id: str):
+ inputs = []
+ outputs = []
+ run_facets: dict[str, BaseFacet] = {}
+ if hasattr(self, "log"):
+ self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+ try:
+ job = self.client.get_job(job_id=job_id) # type: ignore
+ props = job._properties
+
+ if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+ raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
+
+ run_facets["bigQuery_job"] =
self._get_bigquery_job_run_facet(props)
+
+ if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
+ if hasattr(self, "log"):
+ self.log.debug("Found SCRIPT job. Extracting lineage from
child jobs instead.")
+ # SCRIPT job type has no input / output information but spawns
child jobs that have one
+ #
https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job
+ for child_job_id in self.client.list_jobs(parent_job=job_id):
+ child_job = self.client.get_job(job_id=child_job_id) #
type: ignore
+ child_inputs, child_output =
self._get_inputs_outputs_from_job(child_job._properties)
+ inputs.extend(child_inputs)
+ outputs.append(child_output)
+ else:
+ inputs, _output = self._get_inputs_outputs_from_job(props)
+ outputs.append(_output)
+ except Exception as e:
+ if hasattr(self, "log"):
+ self.log.warning("Cannot retrieve job details from
BigQuery.Client. %s", e, exc_info=True)
+ run_facets.update(
+ {
+ "errorMessage": ErrorMessageRunFacet(
+ message=f"{e}: {traceback.format_exc()}",
+ programmingLanguage="python",
+ )
+ }
+ )
+ deduplicated_outputs = self._deduplicate_outputs(outputs)
+ # For complex scripts there can be multiple outputs - in that case
keep them all in `outputs` and
+ # leave the `output` empty to avoid providing misleading information.
When the script has a single
+ # output (f.e. a single statement with some variable declarations),
treat it as a regular non-script
+ # job and put the output in `output` as an addition to new `outputs`.
`output` is deprecated.
+ return inputs, deduplicated_outputs, run_facets
+
+ def _deduplicate_outputs(self, outputs: list[Dataset | None]) ->
list[Dataset]:
+ # Sources are the same so we can compare only names
+ final_outputs = {}
+ for single_output in outputs:
+ if not single_output:
+ continue
+ key = single_output.name
+ if key not in final_outputs:
+ final_outputs[key] = single_output
+ continue
+
+ # No OutputStatisticsOutputDatasetFacet is added to duplicated
outputs as we can not determine
+ # if the rowCount or size can be summed together.
+ single_output.facets.pop("outputStatistics", None)
+ final_outputs[key] = single_output
+
+ return list(final_outputs.values())
+
+ def _get_inputs_outputs_from_job(self, properties: dict) ->
tuple[list[Dataset], Dataset | None]:
+ dataset_stat_facet = self._get_statistics_dataset_facet(properties)
Review Comment:
nit: we could create `dataset_stat_facet` inside the first if, after line
278 when it's needed
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
+ Job facets should contain:
+ - SqlJobFacet if operator has self.sql
+
+ Input datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+
+ Output datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+ - OutputStatisticsOutputDatasetFacet
+ """
+ from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.job_id:
+ return OperatorLineage()
+
+ run_facets: dict[str, BaseFacet] = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+ }
+
+ job_facets = {"sql":
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+ self.client = self.hook.get_client(project_id=self.hook.project_id)
+ job_ids = self.job_id
+ if isinstance(self.job_id, str):
+ job_ids = [self.job_id]
+ inputs, outputs = [], []
+ for job_id in job_ids:
+ inner_inputs, inner_outputs, inner_run_facets =
self.get_facets(job_id=job_id)
+ inputs.extend(inner_inputs)
+ outputs.extend(inner_outputs)
+ run_facets.update(inner_run_facets)
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ run_facets=run_facets,
+ job_facets=job_facets,
+ )
+
+ def get_facets(self, job_id: str):
+ inputs = []
+ outputs = []
+ run_facets: dict[str, BaseFacet] = {}
+ if hasattr(self, "log"):
+ self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+ try:
+ job = self.client.get_job(job_id=job_id) # type: ignore
+ props = job._properties
+
+ if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+ raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
+
+ run_facets["bigQuery_job"] =
self._get_bigquery_job_run_facet(props)
+
+ if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
+ if hasattr(self, "log"):
+ self.log.debug("Found SCRIPT job. Extracting lineage from
child jobs instead.")
+ # SCRIPT job type has no input / output information but spawns
child jobs that have one
+ #
https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job
+ for child_job_id in self.client.list_jobs(parent_job=job_id):
+ child_job = self.client.get_job(job_id=child_job_id) #
type: ignore
+ child_inputs, child_output =
self._get_inputs_outputs_from_job(child_job._properties)
+ inputs.extend(child_inputs)
+ outputs.append(child_output)
+ else:
+ inputs, _output = self._get_inputs_outputs_from_job(props)
+ outputs.append(_output)
+ except Exception as e:
+ if hasattr(self, "log"):
+ self.log.warning("Cannot retrieve job details from
BigQuery.Client. %s", e, exc_info=True)
+ run_facets.update(
+ {
+ "errorMessage": ErrorMessageRunFacet(
+ message=f"{e}: {traceback.format_exc()}",
+ programmingLanguage="python",
+ )
+ }
+ )
+ deduplicated_outputs = self._deduplicate_outputs(outputs)
+ # For complex scripts there can be multiple outputs - in that case
keep them all in `outputs` and
+ # leave the `output` empty to avoid providing misleading information.
When the script has a single
+ # output (f.e. a single statement with some variable declarations),
treat it as a regular non-script
+ # job and put the output in `output` as an addition to new `outputs`.
`output` is deprecated.
+ return inputs, deduplicated_outputs, run_facets
+
+ def _deduplicate_outputs(self, outputs: list[Dataset | None]) ->
list[Dataset]:
+ # Sources are the same so we can compare only names
+ final_outputs = {}
+ for single_output in outputs:
+ if not single_output:
+ continue
+ key = single_output.name
+ if key not in final_outputs:
+ final_outputs[key] = single_output
+ continue
+
+ # No OutputStatisticsOutputDatasetFacet is added to duplicated
outputs as we can not determine
+ # if the rowCount or size can be summed together.
+ single_output.facets.pop("outputStatistics", None)
+ final_outputs[key] = single_output
+
+ return list(final_outputs.values())
+
+ def _get_inputs_outputs_from_job(self, properties: dict) ->
tuple[list[Dataset], Dataset | None]:
+ dataset_stat_facet = self._get_statistics_dataset_facet(properties)
+ input_tables = get_from_nullable_chain(properties, ["statistics",
"query", "referencedTables"]) or []
+ output_table = get_from_nullable_chain(properties, ["configuration",
"query", "destinationTable"])
+ inputs = [self._get_dataset(input_table) for input_table in
input_tables]
+ if output_table:
+ output = self._get_dataset(output_table)
+ if dataset_stat_facet:
+ output.facets.update({"outputStatistics": dataset_stat_facet})
+
+ return inputs, output
+
+ @staticmethod
+ def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet:
+ if get_from_nullable_chain(properties, ["configuration", "query",
"query"]):
+ # Exclude the query to avoid event size issues and duplicating
SqlJobFacet information.
+ properties = copy.deepcopy(properties)
+ properties["configuration"]["query"].pop("query")
+ cache_hit = get_from_nullable_chain(properties, ["statistics",
"query", "cacheHit"])
+ billed_bytes = get_from_nullable_chain(properties, ["statistics",
"query", "totalBytesBilled"])
+ return BigQueryJobRunFacet(
+ cached=str(cache_hit).lower() == "true",
+ billedBytes=int(billed_bytes) if billed_bytes else None,
+ properties=json.dumps(properties),
+ )
+
+ @staticmethod
+ def _get_statistics_dataset_facet(properties) ->
OutputStatisticsOutputDatasetFacet | None:
+ query_plan = get_from_nullable_chain(properties, chain=["statistics",
"query", "queryPlan"])
+ if not query_plan:
+ return None
+
+ out_stage = query_plan[-1]
+ out_rows = out_stage.get("recordsWritten", None)
+ out_bytes = out_stage.get("shuffleOutputBytes", None)
+ if out_bytes and out_rows:
+ return OutputStatisticsOutputDatasetFacet(rowCount=int(out_rows),
size=int(out_bytes))
+ return None
+
+ def _get_dataset(self, table: dict) -> Dataset:
+ project = table.get("projectId")
+ dataset = table.get("datasetId")
+ table_name = table.get("tableId")
+ dataset_name = f"{project}.{dataset}.{table_name}"
+
+ dataset_schema = self._get_table_safely(dataset_name)
+ return Dataset(
+ namespace=BIGQUERY_NAMESPACE,
+ name=dataset_name,
+ facets={
+ "schema": dataset_schema,
+ }
+ if dataset_schema
+ else {},
+ )
+
+ def _get_table_safely(self, table_name):
Review Comment:
```suggestion
def _get_table_schema_safely(self, table_name: str) ->
SchemaDatasetFacet | None:
```
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
Review Comment:
Just a question as I'm not sure how it should work: Should this facet be
defined her or maybe in OL provider? Should the schema json file be here, in OL
provider or nowhere? How does adding new facets for OL work in Airflow, when we
are not in OL repo ?
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
+ Job facets should contain:
+ - SqlJobFacet if operator has self.sql
+
+ Input datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+
+ Output datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+ - OutputStatisticsOutputDatasetFacet
+ """
+ from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.job_id:
+ return OperatorLineage()
+
+ run_facets: dict[str, BaseFacet] = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+ }
+
+ job_facets = {"sql":
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+ self.client = self.hook.get_client(project_id=self.hook.project_id)
+ job_ids = self.job_id
+ if isinstance(self.job_id, str):
+ job_ids = [self.job_id]
+ inputs, outputs = [], []
+ for job_id in job_ids:
+ inner_inputs, inner_outputs, inner_run_facets =
self.get_facets(job_id=job_id)
+ inputs.extend(inner_inputs)
+ outputs.extend(inner_outputs)
+ run_facets.update(inner_run_facets)
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ run_facets=run_facets,
+ job_facets=job_facets,
+ )
+
+ def get_facets(self, job_id: str):
+ inputs = []
+ outputs = []
+ run_facets: dict[str, BaseFacet] = {}
+ if hasattr(self, "log"):
+ self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+ try:
+ job = self.client.get_job(job_id=job_id) # type: ignore
+ props = job._properties
+
+ if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+ raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
+
+ run_facets["bigQuery_job"] =
self._get_bigquery_job_run_facet(props)
+
+ if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
+ if hasattr(self, "log"):
+ self.log.debug("Found SCRIPT job. Extracting lineage from
child jobs instead.")
+ # SCRIPT job type has no input / output information but spawns
child jobs that have one
+ #
https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job
+ for child_job_id in self.client.list_jobs(parent_job=job_id):
+ child_job = self.client.get_job(job_id=child_job_id) #
type: ignore
+ child_inputs, child_output =
self._get_inputs_outputs_from_job(child_job._properties)
+ inputs.extend(child_inputs)
+ outputs.append(child_output)
+ else:
+ inputs, _output = self._get_inputs_outputs_from_job(props)
+ outputs.append(_output)
+ except Exception as e:
+ if hasattr(self, "log"):
+ self.log.warning("Cannot retrieve job details from
BigQuery.Client. %s", e, exc_info=True)
+ run_facets.update(
+ {
+ "errorMessage": ErrorMessageRunFacet(
+ message=f"{e}: {traceback.format_exc()}",
+ programmingLanguage="python",
+ )
+ }
+ )
+ deduplicated_outputs = self._deduplicate_outputs(outputs)
+ # For complex scripts there can be multiple outputs - in that case
keep them all in `outputs` and
+ # leave the `output` empty to avoid providing misleading information.
When the script has a single
+ # output (f.e. a single statement with some variable declarations),
treat it as a regular non-script
+ # job and put the output in `output` as an addition to new `outputs`.
`output` is deprecated.
+ return inputs, deduplicated_outputs, run_facets
+
+ def _deduplicate_outputs(self, outputs: list[Dataset | None]) ->
list[Dataset]:
+ # Sources are the same so we can compare only names
+ final_outputs = {}
+ for single_output in outputs:
+ if not single_output:
+ continue
+ key = single_output.name
+ if key not in final_outputs:
+ final_outputs[key] = single_output
+ continue
+
+ # No OutputStatisticsOutputDatasetFacet is added to duplicated
outputs as we can not determine
+ # if the rowCount or size can be summed together.
+ single_output.facets.pop("outputStatistics", None)
+ final_outputs[key] = single_output
+
+ return list(final_outputs.values())
+
+ def _get_inputs_outputs_from_job(self, properties: dict) ->
tuple[list[Dataset], Dataset | None]:
+ dataset_stat_facet = self._get_statistics_dataset_facet(properties)
+ input_tables = get_from_nullable_chain(properties, ["statistics",
"query", "referencedTables"]) or []
+ output_table = get_from_nullable_chain(properties, ["configuration",
"query", "destinationTable"])
+ inputs = [self._get_dataset(input_table) for input_table in
input_tables]
+ if output_table:
+ output = self._get_dataset(output_table)
+ if dataset_stat_facet:
+ output.facets.update({"outputStatistics": dataset_stat_facet})
+
+ return inputs, output
+
+ @staticmethod
+ def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet:
+ if get_from_nullable_chain(properties, ["configuration", "query",
"query"]):
+ # Exclude the query to avoid event size issues and duplicating
SqlJobFacet information.
+ properties = copy.deepcopy(properties)
+ properties["configuration"]["query"].pop("query")
+ cache_hit = get_from_nullable_chain(properties, ["statistics",
"query", "cacheHit"])
+ billed_bytes = get_from_nullable_chain(properties, ["statistics",
"query", "totalBytesBilled"])
+ return BigQueryJobRunFacet(
+ cached=str(cache_hit).lower() == "true",
+ billedBytes=int(billed_bytes) if billed_bytes else None,
+ properties=json.dumps(properties),
+ )
+
+ @staticmethod
+ def _get_statistics_dataset_facet(properties) ->
OutputStatisticsOutputDatasetFacet | None:
Review Comment:
F.e. this change from `BigQueryStatisticsDatasetFacet` to
`OutputStatisticsOutputDatasetFacet` is awesome as the keys are the same and
the facet will be available under the same key, so the user will not really see
any difference. 🎉
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
+ Job facets should contain:
+ - SqlJobFacet if operator has self.sql
+
+ Input datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+
+ Output datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+ - OutputStatisticsOutputDatasetFacet
+ """
+ from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.job_id:
+ return OperatorLineage()
+
+ run_facets: dict[str, BaseFacet] = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+ }
+
+ job_facets = {"sql":
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+ self.client = self.hook.get_client(project_id=self.hook.project_id)
+ job_ids = self.job_id
+ if isinstance(self.job_id, str):
+ job_ids = [self.job_id]
+ inputs, outputs = [], []
+ for job_id in job_ids:
+ inner_inputs, inner_outputs, inner_run_facets =
self.get_facets(job_id=job_id)
+ inputs.extend(inner_inputs)
+ outputs.extend(inner_outputs)
+ run_facets.update(inner_run_facets)
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ run_facets=run_facets,
+ job_facets=job_facets,
+ )
+
+ def get_facets(self, job_id: str):
+ inputs = []
+ outputs = []
+ run_facets: dict[str, BaseFacet] = {}
+ if hasattr(self, "log"):
+ self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+ try:
+ job = self.client.get_job(job_id=job_id) # type: ignore
+ props = job._properties
+
+ if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+ raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
+
+ run_facets["bigQuery_job"] =
self._get_bigquery_job_run_facet(props)
+
+ if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
+ if hasattr(self, "log"):
+ self.log.debug("Found SCRIPT job. Extracting lineage from
child jobs instead.")
+ # SCRIPT job type has no input / output information but spawns
child jobs that have one
+ #
https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job
+ for child_job_id in self.client.list_jobs(parent_job=job_id):
+ child_job = self.client.get_job(job_id=child_job_id) #
type: ignore
+ child_inputs, child_output =
self._get_inputs_outputs_from_job(child_job._properties)
+ inputs.extend(child_inputs)
+ outputs.append(child_output)
+ else:
+ inputs, _output = self._get_inputs_outputs_from_job(props)
+ outputs.append(_output)
+ except Exception as e:
+ if hasattr(self, "log"):
+ self.log.warning("Cannot retrieve job details from
BigQuery.Client. %s", e, exc_info=True)
+ run_facets.update(
+ {
+ "errorMessage": ErrorMessageRunFacet(
+ message=f"{e}: {traceback.format_exc()}",
+ programmingLanguage="python",
+ )
+ }
+ )
+ deduplicated_outputs = self._deduplicate_outputs(outputs)
+ # For complex scripts there can be multiple outputs - in that case
keep them all in `outputs` and
+ # leave the `output` empty to avoid providing misleading information.
When the script has a single
+ # output (f.e. a single statement with some variable declarations),
treat it as a regular non-script
+ # job and put the output in `output` as an addition to new `outputs`.
`output` is deprecated.
+ return inputs, deduplicated_outputs, run_facets
+
+ def _deduplicate_outputs(self, outputs: list[Dataset | None]) ->
list[Dataset]:
+ # Sources are the same so we can compare only names
+ final_outputs = {}
+ for single_output in outputs:
+ if not single_output:
+ continue
+ key = single_output.name
+ if key not in final_outputs:
+ final_outputs[key] = single_output
+ continue
+
+ # No OutputStatisticsOutputDatasetFacet is added to duplicated
outputs as we can not determine
+ # if the rowCount or size can be summed together.
+ single_output.facets.pop("outputStatistics", None)
+ final_outputs[key] = single_output
+
+ return list(final_outputs.values())
+
+ def _get_inputs_outputs_from_job(self, properties: dict) ->
tuple[list[Dataset], Dataset | None]:
+ dataset_stat_facet = self._get_statistics_dataset_facet(properties)
+ input_tables = get_from_nullable_chain(properties, ["statistics",
"query", "referencedTables"]) or []
+ output_table = get_from_nullable_chain(properties, ["configuration",
"query", "destinationTable"])
+ inputs = [self._get_dataset(input_table) for input_table in
input_tables]
+ if output_table:
+ output = self._get_dataset(output_table)
+ if dataset_stat_facet:
+ output.facets.update({"outputStatistics": dataset_stat_facet})
+
+ return inputs, output
+
+ @staticmethod
+ def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet:
+ if get_from_nullable_chain(properties, ["configuration", "query",
"query"]):
+ # Exclude the query to avoid event size issues and duplicating
SqlJobFacet information.
+ properties = copy.deepcopy(properties)
+ properties["configuration"]["query"].pop("query")
+ cache_hit = get_from_nullable_chain(properties, ["statistics",
"query", "cacheHit"])
+ billed_bytes = get_from_nullable_chain(properties, ["statistics",
"query", "totalBytesBilled"])
+ return BigQueryJobRunFacet(
+ cached=str(cache_hit).lower() == "true",
+ billedBytes=int(billed_bytes) if billed_bytes else None,
+ properties=json.dumps(properties),
+ )
+
+ @staticmethod
+ def _get_statistics_dataset_facet(properties) ->
OutputStatisticsOutputDatasetFacet | None:
+ query_plan = get_from_nullable_chain(properties, chain=["statistics",
"query", "queryPlan"])
+ if not query_plan:
+ return None
+
+ out_stage = query_plan[-1]
+ out_rows = out_stage.get("recordsWritten", None)
+ out_bytes = out_stage.get("shuffleOutputBytes", None)
+ if out_bytes and out_rows:
+ return OutputStatisticsOutputDatasetFacet(rowCount=int(out_rows),
size=int(out_bytes))
+ return None
+
+ def _get_dataset(self, table: dict) -> Dataset:
+ project = table.get("projectId")
+ dataset = table.get("datasetId")
+ table_name = table.get("tableId")
+ dataset_name = f"{project}.{dataset}.{table_name}"
+
+ dataset_schema = self._get_table_safely(dataset_name)
+ return Dataset(
+ namespace=BIGQUERY_NAMESPACE,
+ name=dataset_name,
+ facets={
+ "schema": dataset_schema,
+ }
+ if dataset_schema
+ else {},
+ )
+
+ def _get_table_safely(self, table_name):
+ try:
+ return self._get_table_schema(table_name)
+ except Exception as e:
+ if hasattr(self, "log"):
+ self.log.warning("Could not extract output schema from
bigquery. %s", e)
+ return None
+
+ def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None:
+ bq_table = self.client.get_table(table)
+
+ if not bq_table._properties:
+ return None
+ table_prop = bq_table._properties
+
+ fields = get_from_nullable_chain(table_prop, ["schema", "fields"])
Review Comment:
```suggestion
fields = get_from_nullable_chain(bq_table._properties, ["schema",
"fields"])
```
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
Review Comment:
Maybe we should have that in OL provider utils in case we'd want to use it
in other providers as well?
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
+ Job facets should contain:
+ - SqlJobFacet if operator has self.sql
+
+ Input datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+
+ Output datasets should contain facets:
+ - DataSourceDatasetFacet
+ - SchemaDatasetFacet
+ - OutputStatisticsOutputDatasetFacet
+ """
+ from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.job_id:
+ return OperatorLineage()
+
+ run_facets: dict[str, BaseFacet] = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
+ }
+
+ job_facets = {"sql":
SqlJobFacet(query=SQLParser.normalize_sql(self.sql))}
+
+ self.client = self.hook.get_client(project_id=self.hook.project_id)
+ job_ids = self.job_id
+ if isinstance(self.job_id, str):
+ job_ids = [self.job_id]
+ inputs, outputs = [], []
+ for job_id in job_ids:
+ inner_inputs, inner_outputs, inner_run_facets =
self.get_facets(job_id=job_id)
+ inputs.extend(inner_inputs)
+ outputs.extend(inner_outputs)
+ run_facets.update(inner_run_facets)
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ run_facets=run_facets,
+ job_facets=job_facets,
+ )
+
+ def get_facets(self, job_id: str):
+ inputs = []
+ outputs = []
+ run_facets: dict[str, BaseFacet] = {}
+ if hasattr(self, "log"):
+ self.log.debug("Extracting data from bigquery job: `%s`", job_id)
+ try:
+ job = self.client.get_job(job_id=job_id) # type: ignore
+ props = job._properties
+
+ if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
+ raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
+
+ run_facets["bigQuery_job"] =
self._get_bigquery_job_run_facet(props)
Review Comment:
nit: maybe we should think about changing `bigQuery_job` to `bigQueryJob` ?
##########
airflow/providers/google/cloud/utils/openlineage.py:
##########
@@ -79,3 +92,265 @@ def get_identity_column_lineage_facet(
}
)
return column_lineage_facet
+
+
+@define
+class BigQueryJobRunFacet(BaseFacet):
+ """Facet that represents relevant statistics of bigquery run.
+
+ This facet is used to provide statistics about bigquery run.
+
+ :param cached: BigQuery caches query results. Rest of the statistics will
not be provided for cached queries.
+ :param billedBytes: How many bytes BigQuery bills for.
+ :param properties: Full property tree of BigQUery run.
+ """
+
+ cached: bool
+ billedBytes: int | None = field(default=None)
+ properties: str | None = field(default=None)
+
+
+def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
+ """Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
+
+ Intended to replace chain of `dict.get()` statements.
+
+ Example usage:
+
+ .. code-block:: python
+
+ if (
+ not job._properties.get("statistics")
+ or not job._properties.get("statistics").get("query")
+ or not
job._properties.get("statistics").get("query").get("referencedTables")
+ ):
+ return None
+ result =
job._properties.get("statistics").get("query").get("referencedTables")
+
+ becomes:
+
+ .. code-block:: python
+
+ result = get_from_nullable_chain(properties, ["statistics", "query",
"queryPlan"])
+ if not result:
+ return None
+ """
+ chain.reverse()
+ try:
+ while chain:
+ next_key = chain.pop()
+ if isinstance(source, dict):
+ source = source.get(next_key)
+ else:
+ source = getattr(source, next_key)
+ return source
+ except AttributeError:
+ return None
+
+
+class _BigQueryOpenLineageMixin:
+ def get_openlineage_facets_on_complete(self, _):
+ """
+ Retrieve OpenLineage data for a COMPLETE BigQuery job.
+
+ This method retrieves statistics for the specified job_ids using the
BigQueryDatasetsProvider.
+ It calls BigQuery API, retrieving input and output dataset info from
it, as well as run-level
+ usage statistics.
+
+ Run facets should contain:
+ - ExternalQueryRunFacet
+ - BigQueryJobRunFacet
+
Review Comment:
```suggestion
Run facets should contain:
- ExternalQueryRunFacet
- BigQueryJobRunFacet
Run facets may contain:
- ErrorMessageRunFacet
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]