mobuchowski commented on code in PR #33959:
URL: https://github.com/apache/airflow/pull/33959#discussion_r1312831927
##########
airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -211,12 +212,12 @@ def get_request_url_params(
async def get_headers_tenants_from_connection(self) -> tuple[dict[str,
Any], str]:
"""Get Headers, tenants from the connection details."""
headers: dict[str, Any] = {}
- connection: Connection = await
sync_to_async(self.get_connection)(self.dbt_cloud_conn_id)
- tenant = self._get_tenant_domain(connection)
+ # connection: Connection = await
sync_to_async(self.get_connection)(self.dbt_cloud_conn_id)
Review Comment:
Should this be commented?
##########
airflow/providers/dbt/cloud/operators/dbt.py:
##########
@@ -211,6 +212,27 @@ def on_kill(self) -> None:
):
self.log.info("Job run %s has been cancelled successfully.",
str(self.run_id))
+ @cached_property
+ def hook(self):
+ """Returns DBT Cloud hook."""
+ return DbtCloudHook(self.dbt_cloud_conn_id)
+
+ def get_openlineage_facets_on_complete(self, task_instance) ->
OperatorLineage:
+ """
+ Implementing _on_complete because job_run needs to be triggered first
in execute method.
+
+ This should send additional events only if operator
`wait_for_termination` is set to True.
+ """
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ if self.wait_for_termination is True:
Review Comment:
Can we capture lineage in deferrable mode?
##########
airflow/providers/openlineage/plugins/listener.py:
##########
@@ -202,3 +204,11 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str):
self.log.error("Executor have not started before
`on_dag_run_failed`")
return
self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
+
+
+def get_openlineage_listener() -> OpenLineageListener:
Review Comment:
Why do we need this?
##########
airflow/providers/openlineage/plugins/openlineage.py:
##########
@@ -46,7 +47,5 @@ class OpenLineageProviderPlugin(AirflowPlugin):
name = "OpenLineageProviderPlugin"
if not _is_disabled():
- from airflow.providers.openlineage.plugins.listener import
OpenLineageListener
-
macros = [lineage_run_id, lineage_parent_id]
- listeners = [OpenLineageListener()]
+ listeners = [get_openlineage_listener()]
Review Comment:
Why do we need this change?
##########
airflow/providers/dbt/cloud/utils/openlineage.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import re
+from contextlib import suppress
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook
+ from airflow.providers.dbt.cloud.operators.dbt import
DbtCloudRunJobOperator
+ from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
+ from airflow.providers.openlineage.extractors.base import OperatorLineage
+
+
+def generate_openlineage_events_from_dbt_cloud_run(
Review Comment:
This method is pretty dense and would benefit from more documentation,
especially around async parts
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]