mobuchowski commented on code in PR #47500:
URL: https://github.com/apache/airflow/pull/47500#discussion_r1985314038
##########
providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py:
##########
@@ -17,39 +17,41 @@
from __future__ import annotations
import asyncio
+import logging
import re
-from contextlib import suppress
from typing import TYPE_CHECKING
-from packaging.version import parse
-
-from airflow import __version__ as airflow_version
+from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
- from packaging.version import Version
-
from airflow.models.taskinstance import TaskInstance
from airflow.providers.dbt.cloud.operators.dbt import
DbtCloudRunJobOperator
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
from airflow.providers.openlineage.extractors.base import OperatorLineage
-_AIRFLOW_VERSION: Version = parse(parse(airflow_version).base_version)
+log = logging.getLogger(__name__)
def _get_logical_date(task_instance):
# todo: remove when min airflow version >= 3.0
- if parse("3") > _AIRFLOW_VERSION:
- return task_instance.execution_date
- return task_instance.logical_date
+ if AIRFLOW_V_3_0_PLUS:
+ dagrun = task_instance.get_template_context()["dag_run"]
+ return dagrun.logical_date or dagrun.run_after
Review Comment:
Yes, given that you don't have to deal with tests without `dag_run` :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]