mobuchowski commented on code in PR #50392:
URL: https://github.com/apache/airflow/pull/50392#discussion_r2084436355


##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -309,3 +318,83 @@ def bulk_dump(self, table, tmp_file):
 
     def bulk_load(self, table, tmp_file):
         raise NotImplementedError()
+
+    def get_openlineage_database_info(self, connection) -> DatabaseInfo:
+        from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+        return DatabaseInfo(
+            scheme=self.get_openlineage_database_dialect(connection),
+            authority=self._get_openlineage_authority(connection),
+            database=self.catalog,
+            information_schema_columns=[
+                "table_schema",
+                "table_name",
+                "column_name",
+                "ordinal_position",
+                "data_type",
+                "table_catalog",
+            ],
+            is_information_schema_cross_db=True,
+        )
+
+    def get_openlineage_database_dialect(self, _) -> str:
+        return "databricks"
+
+    def get_openlineage_default_schema(self) -> str | None:
+        return self.schema or "default"
+
+    def _get_openlineage_authority(self, _) -> str | None:
+        return self.host
+
+    def get_openlineage_database_specific_lineage(self, task_instance) -> 
OperatorLineage | None:
+        """
+        Generate OpenLineage metadata for a Databricks task instance based on 
executed query IDs.
+
+        If a single query ID is present, attach an `ExternalQueryRunFacet` to 
the lineage metadata.
+        If multiple query IDs are present, emits separate OpenLineage events 
for each query instead.
+
+        Note that `get_openlineage_database_specific_lineage` is usually 
called after task's execution,
+        so if multiple query IDs are present, both START and COMPLETE event 
for each query will be emitted
+        after task's execution. If we are able to query Databricks for query 
execution metadata,
+        query event times will correspond to actual query's start and finish 
times.
+
+        Args:
+            task_instance: The Airflow TaskInstance object for which lineage 
is being collected.
+
+        Returns:
+            An `OperatorLineage` object if a single query ID is found; 
otherwise `None`.
+        """
+        from airflow.providers.common.compat.openlineage.facet import 
ExternalQueryRunFacet
+        from airflow.providers.databricks.utils.openlineage import (
+            emit_openlineage_events_for_databricks_queries,
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+        from airflow.providers.openlineage.sqlparser import SQLParser
+
+        if not self.query_ids:
+            self.log.debug("openlineage: no databricks query ids found.")
+            return None
+
+        self.log.debug("openlineage: getting connection to get database info")
+        connection = self.get_connection(self.get_conn_id())
+        namespace = 
SQLParser.create_namespace(self.get_openlineage_database_info(connection))
+
+        if len(self.query_ids) == 1:
+            self.log.debug("Attaching ExternalQueryRunFacet with single 
query_id to OpenLineage event.")
+            return OperatorLineage(
+                run_facets={
+                    "externalQuery": ExternalQueryRunFacet(
+                        externalQueryId=self.query_ids[0], source=namespace
+                    )
+                }
+            )
+
+        self.log.info("Multiple query_ids found. Separate OpenLineage event 
will be emitted for each query.")
+        emit_openlineage_events_for_databricks_queries(
+            query_ids=self.query_ids,
+            query_source_namespace=namespace,
+            task_instance=task_instance,
+            hook=self,
+        )

Review Comment:
   Yeah I agree. At some point we might want to do that, hide behind a flag, 
and later turn on by default for some Airflow version for example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to