mobuchowski commented on code in PR #44996:
URL: https://github.com/apache/airflow/pull/44996#discussion_r1901237375
##########
providers/src/airflow/providers/google/cloud/openlineage/mixins.py:
##########
@@ -199,115 +201,166 @@ def _get_dataset(self, table: dict, dataset_type: str)
-> Dataset:
table_name = table.get("tableId")
dataset_name = f"{project}.{dataset}.{table_name}"
- dataset_schema = self._get_table_schema_safely(dataset_name)
+ dataset_facets = self._get_table_facets_safely(dataset_name)
if dataset_type == "input":
# Logic specific to creating InputDataset (if needed)
return InputDataset(
namespace=BIGQUERY_NAMESPACE,
name=dataset_name,
- facets={
- "schema": dataset_schema,
- }
- if dataset_schema
- else {},
+ facets=dataset_facets,
)
elif dataset_type == "output":
# Logic specific to creating OutputDataset (if needed)
return OutputDataset(
namespace=BIGQUERY_NAMESPACE,
name=dataset_name,
- facets={
- "schema": dataset_schema,
- }
- if dataset_schema
- else {},
+ facets=dataset_facets,
)
else:
raise ValueError("Invalid dataset_type. Must be 'input' or
'output'")
- def _get_table_schema_safely(self, table_name: str) -> SchemaDatasetFacet
| None:
+ def _get_table_facets_safely(self, table_name: str) -> dict[str,
DatasetFacet]:
try:
- return self._get_table_schema(table_name)
+ bq_table = self._client.get_table(table_name)
+ return get_facets_from_bq_table(bq_table)
except Exception as e:
- self.log.warning("Could not extract output schema from bigquery.
%s", e) # type: ignore[attr-defined]
- return None
-
- def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None:
- bq_table = self._client.get_table(table)
-
- if not bq_table._properties:
- return None
-
- fields = get_from_nullable_chain(bq_table._properties, ["schema",
"fields"])
- if not fields:
- return None
-
- return SchemaDatasetFacet(
- fields=[
- SchemaDatasetFacetFields(
- name=field.get("name"),
- type=field.get("type"),
- description=field.get("description"),
- )
- for field in fields
- ]
- )
+ self.log.warning("Could not extract facets from bigquery table:
`%s`. %s", table_name, e) # type: ignore[attr-defined]
+ return {}
- def _get_inputs_and_output_for_query_job(
+ def _get_inputs_and_outputs_for_query_job(
self, properties: dict
- ) -> tuple[list[InputDataset], OutputDataset | None]:
+ ) -> tuple[tuple[InputDataset, ...], tuple[OutputDataset, ...]]:
input_tables = get_from_nullable_chain(properties, ["statistics",
"query", "referencedTables"]) or []
output_table = get_from_nullable_chain(properties, ["configuration",
"query", "destinationTable"])
- inputs = [
+ inputs = tuple(
(self._get_input_dataset(input_table))
for input_table in input_tables
if input_table != output_table # Output table is in
`referencedTables` and needs to be removed
- ]
+ )
if not output_table:
- return inputs, None
+ return inputs, ()
output = self._get_output_dataset(output_table)
- if dataset_stat_facet :=
self._get_statistics_dataset_facet(properties):
+ if dataset_stat_facet :=
self._get_output_statistics_dataset_facet(properties):
output.outputFacets = output.outputFacets or {}
output.outputFacets["outputStatistics"] = dataset_stat_facet
- if cll_facet := self._get_column_level_lineage_facet(properties,
output, inputs):
+ if cll_facet :=
self._get_column_level_lineage_facet_for_query_job(properties, output, inputs):
output.facets = output.facets or {}
output.facets["columnLineage"] = cll_facet
- return inputs, output
+ return inputs, (output,)
+
+ def _get_inputs_and_outputs_for_load_job(
+ self, properties: dict
+ ) -> tuple[tuple[InputDataset, ...], tuple[OutputDataset, ...]]:
Review Comment:
Maybe tuple of lists? It feels that tuples of tuples have more potential for
subtle bugs...
##########
providers/src/airflow/providers/google/cloud/openlineage/mixins.py:
##########
@@ -199,115 +201,166 @@ def _get_dataset(self, table: dict, dataset_type: str)
-> Dataset:
table_name = table.get("tableId")
dataset_name = f"{project}.{dataset}.{table_name}"
- dataset_schema = self._get_table_schema_safely(dataset_name)
+ dataset_facets = self._get_table_facets_safely(dataset_name)
if dataset_type == "input":
# Logic specific to creating InputDataset (if needed)
return InputDataset(
namespace=BIGQUERY_NAMESPACE,
name=dataset_name,
- facets={
- "schema": dataset_schema,
- }
- if dataset_schema
- else {},
+ facets=dataset_facets,
)
elif dataset_type == "output":
# Logic specific to creating OutputDataset (if needed)
return OutputDataset(
namespace=BIGQUERY_NAMESPACE,
name=dataset_name,
- facets={
- "schema": dataset_schema,
- }
- if dataset_schema
- else {},
+ facets=dataset_facets,
)
else:
raise ValueError("Invalid dataset_type. Must be 'input' or
'output'")
- def _get_table_schema_safely(self, table_name: str) -> SchemaDatasetFacet
| None:
+ def _get_table_facets_safely(self, table_name: str) -> dict[str,
DatasetFacet]:
try:
- return self._get_table_schema(table_name)
+ bq_table = self._client.get_table(table_name)
+ return get_facets_from_bq_table(bq_table)
except Exception as e:
- self.log.warning("Could not extract output schema from bigquery.
%s", e) # type: ignore[attr-defined]
- return None
-
- def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None:
- bq_table = self._client.get_table(table)
-
- if not bq_table._properties:
- return None
-
- fields = get_from_nullable_chain(bq_table._properties, ["schema",
"fields"])
- if not fields:
- return None
-
- return SchemaDatasetFacet(
- fields=[
- SchemaDatasetFacetFields(
- name=field.get("name"),
- type=field.get("type"),
- description=field.get("description"),
- )
- for field in fields
- ]
- )
+ self.log.warning("Could not extract facets from bigquery table:
`%s`. %s", table_name, e) # type: ignore[attr-defined]
+ return {}
- def _get_inputs_and_output_for_query_job(
+ def _get_inputs_and_outputs_for_query_job(
self, properties: dict
- ) -> tuple[list[InputDataset], OutputDataset | None]:
+ ) -> tuple[tuple[InputDataset, ...], tuple[OutputDataset, ...]]:
input_tables = get_from_nullable_chain(properties, ["statistics",
"query", "referencedTables"]) or []
output_table = get_from_nullable_chain(properties, ["configuration",
"query", "destinationTable"])
- inputs = [
+ inputs = tuple(
(self._get_input_dataset(input_table))
for input_table in input_tables
if input_table != output_table # Output table is in
`referencedTables` and needs to be removed
- ]
+ )
if not output_table:
- return inputs, None
+ return inputs, ()
output = self._get_output_dataset(output_table)
- if dataset_stat_facet :=
self._get_statistics_dataset_facet(properties):
+ if dataset_stat_facet :=
self._get_output_statistics_dataset_facet(properties):
output.outputFacets = output.outputFacets or {}
output.outputFacets["outputStatistics"] = dataset_stat_facet
- if cll_facet := self._get_column_level_lineage_facet(properties,
output, inputs):
+ if cll_facet :=
self._get_column_level_lineage_facet_for_query_job(properties, output, inputs):
output.facets = output.facets or {}
output.facets["columnLineage"] = cll_facet
- return inputs, output
+ return inputs, (output,)
+
+ def _get_inputs_and_outputs_for_load_job(
+ self, properties: dict
+ ) -> tuple[tuple[InputDataset, ...], tuple[OutputDataset, ...]]:
+ output =
self._get_output_dataset(properties["configuration"]["load"]["destinationTable"])
+ output_table_schema_facet = output.facets.get("schema") if
output.facets else None
+
+ source_uris = properties["configuration"]["load"]["sourceUris"]
+ inputs = tuple(
+ InputDataset(
+ namespace=namespace,
+ name=name,
+ facets={"schema": output_table_schema_facet} if
output_table_schema_facet else {},
+ )
+ for namespace, name in
get_namespace_name_from_source_uris(source_uris)
+ )
+
+ if dataset_stat_facet :=
self._get_output_statistics_dataset_facet(properties):
+ output.outputFacets = output.outputFacets or {}
+ output.outputFacets["outputStatistics"] = dataset_stat_facet
+ if cll_facet :=
get_identity_column_lineage_facet(self._extract_column_names(output), inputs):
+ output.facets = {**output.facets, **cll_facet} if output.facets
else cll_facet
+ return inputs, (output,)
+
+ def _get_inputs_and_outputs_for_copy_job(
+ self, properties: dict
+ ) -> tuple[tuple[InputDataset, ...], tuple[OutputDataset, ...]]:
+ input_tables = get_from_nullable_chain(properties, ["configuration",
"copy", "sourceTables"]) or [
+ get_from_nullable_chain(properties, ["configuration", "copy",
"sourceTable"])
+ ]
+ inputs = tuple(self._get_input_dataset(input_table) for input_table in
input_tables)
+
+ output =
self._get_output_dataset(properties["configuration"]["copy"]["destinationTable"])
+ if dataset_stat_facet :=
self._get_output_statistics_dataset_facet(properties):
+ output.outputFacets = output.outputFacets or {}
+ output.outputFacets["outputStatistics"] = dataset_stat_facet
+ if cll_facet :=
get_identity_column_lineage_facet(self._extract_column_names(output), inputs):
+ output.facets = {**output.facets, **cll_facet} if output.facets
else cll_facet
+ return inputs, (output,)
+
+ def _get_inputs_and_outputs_for_extract_job(
+ self, properties: dict
+ ) -> tuple[tuple[InputDataset, ...], tuple[OutputDataset, ...]]:
+ source_table = get_from_nullable_chain(properties, ["configuration",
"extract", "sourceTable"])
+ input_dataset = self._get_input_dataset(source_table) if source_table
else None
+
+ destination_uris = get_from_nullable_chain(
+ properties, ["configuration", "extract", "destinationUris"]
+ ) or [get_from_nullable_chain(properties, ["configuration", "extract",
"destinationUri"])]
+
+ outputs = []
+ for namespace, name in
get_namespace_name_from_source_uris(destination_uris):
+ output_facets = {}
+ if input_dataset:
+ input_schema = input_dataset.facets.get("schema") if
input_dataset.facets else None
+ if input_schema:
+ output_facets["schema"] = input_schema
+ if cll_facet := get_identity_column_lineage_facet(
+ self._extract_column_names(input_dataset), [input_dataset]
+ ):
+ output_facets = {**output_facets, **cll_facet}
+ outputs.append(OutputDataset(namespace=namespace, name=name,
facets=output_facets))
+
+ inputs = (input_dataset,) if input_dataset else ()
+ return inputs, tuple(outputs)
@staticmethod
def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet:
- if get_from_nullable_chain(properties, ["configuration", "query",
"query"]):
- # Exclude the query to avoid event size issues and duplicating
SqlJobFacet information.
- properties = copy.deepcopy(properties)
- properties["configuration"]["query"].pop("query")
- cache_hit = get_from_nullable_chain(properties, ["statistics",
"query", "cacheHit"])
- billed_bytes = get_from_nullable_chain(properties, ["statistics",
"query", "totalBytesBilled"])
+ job_type = get_from_nullable_chain(properties, ["configuration",
"jobType"])
+ cache_hit, billed_bytes = None, None
+ if job_type == "QUERY":
+ if get_from_nullable_chain(properties, ["configuration", "query",
"query"]):
+ # Exclude the query to avoid event size issues and duplicating
SqlJobFacet information.
+ properties = copy.deepcopy(properties)
+ properties["configuration"]["query"].pop("query")
+ cache_hit = get_from_nullable_chain(properties, ["statistics",
"query", "cacheHit"])
+ billed_bytes = get_from_nullable_chain(properties, ["statistics",
"query", "totalBytesBilled"])
+
return BigQueryJobRunFacet(
cached=str(cache_hit).lower() == "true",
billedBytes=int(billed_bytes) if billed_bytes else None,
properties=json.dumps(properties),
)
@staticmethod
- def _get_statistics_dataset_facet(
+ def _get_output_statistics_dataset_facet(
properties,
) -> OutputStatisticsOutputDatasetFacet | None:
- query_plan = get_from_nullable_chain(properties, chain=["statistics",
"query", "queryPlan"])
- if not query_plan:
- return None
+ job_type = get_from_nullable_chain(properties, ["configuration",
"jobType"])
+ out_rows, out_bytes = None, None
+ if job_type == "QUERY":
+ query_plan = get_from_nullable_chain(properties,
chain=["statistics", "query", "queryPlan"])
+ if not query_plan:
+ return None
+ out_stage = query_plan[-1]
Review Comment:
Would be nice to comment on the magic like that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]