This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9c5d085c44 Replace `models.BaseOperator` to Task SDK one for DBT &
Databricks (#52377)
f9c5d085c44 is described below
commit f9c5d085c445078403279de6e34deac86437d736
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Jun 28 18:37:46 2025 +0530
Replace `models.BaseOperator` to Task SDK one for DBT & Databricks (#52377)
---
.../airflow/providers/databricks/operators/databricks.py | 3 +--
.../providers/databricks/operators/databricks_repos.py | 2 +-
.../providers/databricks/operators/databricks_sql.py | 2 +-
.../databricks/operators/databricks_workflow.py | 3 +--
.../src/airflow/providers/databricks/version_compat.py | 10 ++++++++++
.../src/airflow/providers/dbt/cloud/operators/dbt.py | 14 +++++---------
.../src/airflow/providers/dbt/cloud/version_compat.py | 16 ++++++++++++++++
7 files changed, 35 insertions(+), 15 deletions(-)
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 95d5783de0a..e341daaa8a0 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -29,7 +29,6 @@ from typing import TYPE_CHECKING, Any
from airflow.configuration import conf
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
from airflow.providers.databricks.hooks.databricks import (
DatabricksHook,
RunLifeCycleState,
@@ -49,7 +48,7 @@ from airflow.providers.databricks.triggers.databricks import (
)
from airflow.providers.databricks.utils.databricks import
normalise_json_content, validate_trigger_event
from airflow.providers.databricks.utils.mixins import
DatabricksSQLStatementsMixin
-from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS,
BaseOperator
if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
index 75e6f9f8f9f..2d5f6db8404 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
@@ -26,8 +26,8 @@ from typing import TYPE_CHECKING
from urllib.parse import urlsplit
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
from airflow.providers.databricks.hooks.databricks import DatabricksHook
+from airflow.providers.databricks.version_compat import BaseOperator
if TYPE_CHECKING:
try:
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
index 5e0f9cc7d24..5d9b08a6626 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
@@ -28,9 +28,9 @@ from typing import TYPE_CHECKING, Any, ClassVar
from databricks.sql.utils import ParamEscaper
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
+from airflow.providers.databricks.version_compat import BaseOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
index 9f714fa06d9..21000494402 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
@@ -26,14 +26,13 @@ from typing import TYPE_CHECKING, Any
from mergedeep import merge
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
from airflow.providers.databricks.hooks.databricks import DatabricksHook,
RunLifeCycleState
from airflow.providers.databricks.plugins.databricks_workflow import (
WorkflowJobRepairAllFailedLink,
WorkflowJobRunLink,
store_databricks_job_run_link,
)
-from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS,
BaseOperator
from airflow.utils.task_group import TaskGroup
if TYPE_CHECKING:
diff --git
a/providers/databricks/src/airflow/providers/databricks/version_compat.py
b/providers/databricks/src/airflow/providers/databricks/version_compat.py
index 48d122b6696..e7a259afb35 100644
--- a/providers/databricks/src/airflow/providers/databricks/version_compat.py
+++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py
@@ -33,3 +33,13 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import BaseOperator
+else:
+ from airflow.models import BaseOperator
+
+__all__ = [
+ "AIRFLOW_V_3_0_PLUS",
+ "BaseOperator",
+]
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 1a77ad1d0ca..882cd9073c0 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -24,8 +24,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any
from airflow.configuration import conf
-from airflow.models import BaseOperator
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.dbt.cloud.hooks.dbt import (
DbtCloudHook,
DbtCloudJobRunException,
@@ -34,18 +32,16 @@ from airflow.providers.dbt.cloud.hooks.dbt import (
)
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.providers.dbt.cloud.utils.openlineage import
generate_openlineage_events_from_dbt_cloud_run
+from airflow.providers.dbt.cloud.version_compat import (
+ BaseOperator,
+ BaseOperatorLink,
+ XCom,
+)
if TYPE_CHECKING:
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.context import Context
-if AIRFLOW_V_3_0_PLUS:
- from airflow.sdk import BaseOperatorLink
- from airflow.sdk.execution_time.xcom import XCom
-else:
- from airflow.models import XCom # type: ignore[no-redef]
- from airflow.models.baseoperatorlink import BaseOperatorLink # type:
ignore[no-redef]
-
class DbtCloudRunJobOperatorLink(BaseOperatorLink):
"""Allows users to monitor the triggered job run directly in dbt Cloud."""
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
index 48d122b6696..940fa8385a5 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
@@ -33,3 +33,19 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import BaseOperator, BaseOperatorLink, BaseSensorOperator
+ from airflow.sdk.execution_time.xcom import XCom
+else:
+ from airflow.models import BaseOperator, XCom
+ from airflow.models.baseoperatorlink import BaseOperatorLink # type:
ignore[no-redef]
+ from airflow.sensors.base import BaseSensorOperator # type:
ignore[no-redef]
+
+__all__ = [
+ "AIRFLOW_V_3_0_PLUS",
+ "BaseOperator",
+ "BaseSensorOperator",
+ "BaseOperatorLink",
+ "XCom",
+]