kacpermuda commented on code in PR #47500:
URL: https://github.com/apache/airflow/pull/47500#discussion_r1985175589


##########
providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py:
##########
@@ -17,39 +17,41 @@
 from __future__ import annotations
 
 import asyncio
+import logging
 import re
-from contextlib import suppress
 from typing import TYPE_CHECKING
 
-from packaging.version import parse
-
-from airflow import __version__ as airflow_version
+from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
-    from packaging.version import Version
-
     from airflow.models.taskinstance import TaskInstance
     from airflow.providers.dbt.cloud.operators.dbt import 
DbtCloudRunJobOperator
     from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
     from airflow.providers.openlineage.extractors.base import OperatorLineage
 
 
-_AIRFLOW_VERSION: Version = parse(parse(airflow_version).base_version)
+log = logging.getLogger(__name__)
 
 
 def _get_logical_date(task_instance):
     # todo: remove when min airflow version >= 3.0
-    if parse("3") > _AIRFLOW_VERSION:
-        return task_instance.execution_date
-    return task_instance.logical_date
+    if AIRFLOW_V_3_0_PLUS:
+        dagrun = task_instance.get_template_context()["dag_run"]
+        return dagrun.logical_date or dagrun.run_after

Review Comment:
   @mobuchowski Does this look right? I was trying to mimic OL's listener 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to