kacpermuda commented on code in PR #50392:
URL: https://github.com/apache/airflow/pull/50392#discussion_r2084292177
##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -309,3 +318,83 @@ def bulk_dump(self, table, tmp_file):
def bulk_load(self, table, tmp_file):
raise NotImplementedError()
+
+ def get_openlineage_database_info(self, connection) -> DatabaseInfo:
+ from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+ return DatabaseInfo(
+ scheme=self.get_openlineage_database_dialect(connection),
+ authority=self._get_openlineage_authority(connection),
+ database=self.catalog,
+ information_schema_columns=[
+ "table_schema",
+ "table_name",
+ "column_name",
+ "ordinal_position",
+ "data_type",
+ "table_catalog",
+ ],
+ is_information_schema_cross_db=True,
+ )
+
+ def get_openlineage_database_dialect(self, _) -> str:
+ return "databricks"
+
+ def get_openlineage_default_schema(self) -> str | None:
+ return self.schema or "default"
+
+ def _get_openlineage_authority(self, _) -> str | None:
+ return self.host
+
+ def get_openlineage_database_specific_lineage(self, task_instance) ->
OperatorLineage | None:
+ """
+ Generate OpenLineage metadata for a Databricks task instance based on
executed query IDs.
+
+ If a single query ID is present, attach an `ExternalQueryRunFacet` to
the lineage metadata.
+ If multiple query IDs are present, emits separate OpenLineage events
for each query instead.
+
+ Note that `get_openlineage_database_specific_lineage` is usually
called after task's execution,
+ so if multiple query IDs are present, both START and COMPLETE event
for each query will be emitted
+ after task's execution. If we are able to query Databricks for query
execution metadata,
+ query event times will correspond to actual query's start and finish
times.
+
+ Args:
+ task_instance: The Airflow TaskInstance object for which lineage
is being collected.
+
+ Returns:
+ An `OperatorLineage` object if a single query ID is found;
otherwise `None`.
+ """
+ from airflow.providers.common.compat.openlineage.facet import
ExternalQueryRunFacet
+ from airflow.providers.databricks.utils.openlineage import (
+ emit_openlineage_events_for_databricks_queries,
+ )
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self.query_ids:
+ self.log.debug("openlineage: no databricks query ids found.")
+ return None
+
+ self.log.debug("openlineage: getting connection to get database info")
+ connection = self.get_connection(self.get_conn_id())
+ namespace =
SQLParser.create_namespace(self.get_openlineage_database_info(connection))
+
+ if len(self.query_ids) == 1:
+ self.log.debug("Attaching ExternalQueryRunFacet with single
query_id to OpenLineage event.")
+ return OperatorLineage(
+ run_facets={
+ "externalQuery": ExternalQueryRunFacet(
+ externalQueryId=self.query_ids[0], source=namespace
+ )
+ }
+ )
+
+ self.log.info("Multiple query_ids found. Separate OpenLineage event
will be emitted for each query.")
+ emit_openlineage_events_for_databricks_queries(
+ query_ids=self.query_ids,
+ query_source_namespace=namespace,
+ task_instance=task_instance,
+ hook=self,
+ )
Review Comment:
I think this is mostly for some kind of backward compatibility? With current
logic, for single query we are including the ExternalQuery facet in the task OL
event, so I wanted to keep that behaviour. I think one day we can switch to
emitting separate events only, but we'd have to also add the sql parsing there.
So I agree, but I also think it can be seen as some kind of breaking change, so
it's probably for future and definitely for another PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]