This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9c5d085c44 Replace `models.BaseOperator` to Task SDK one for DBT & 
Databricks (#52377)
f9c5d085c44 is described below

commit f9c5d085c445078403279de6e34deac86437d736
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Jun 28 18:37:46 2025 +0530

    Replace `models.BaseOperator` to Task SDK one for DBT & Databricks (#52377)
---
 .../airflow/providers/databricks/operators/databricks.py |  3 +--
 .../providers/databricks/operators/databricks_repos.py   |  2 +-
 .../providers/databricks/operators/databricks_sql.py     |  2 +-
 .../databricks/operators/databricks_workflow.py          |  3 +--
 .../src/airflow/providers/databricks/version_compat.py   | 10 ++++++++++
 .../src/airflow/providers/dbt/cloud/operators/dbt.py     | 14 +++++---------
 .../src/airflow/providers/dbt/cloud/version_compat.py    | 16 ++++++++++++++++
 7 files changed, 35 insertions(+), 15 deletions(-)

diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 95d5783de0a..e341daaa8a0 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -29,7 +29,6 @@ from typing import TYPE_CHECKING, Any
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
 from airflow.providers.databricks.hooks.databricks import (
     DatabricksHook,
     RunLifeCycleState,
@@ -49,7 +48,7 @@ from airflow.providers.databricks.triggers.databricks import (
 )
 from airflow.providers.databricks.utils.databricks import 
normalise_json_content, validate_trigger_event
 from airflow.providers.databricks.utils.mixins import 
DatabricksSQLStatementsMixin
-from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, 
BaseOperator
 
 if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
index 75e6f9f8f9f..2d5f6db8404 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py
@@ -26,8 +26,8 @@ from typing import TYPE_CHECKING
 from urllib.parse import urlsplit
 
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
 from airflow.providers.databricks.hooks.databricks import DatabricksHook
+from airflow.providers.databricks.version_compat import BaseOperator
 
 if TYPE_CHECKING:
     try:
diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
index 5e0f9cc7d24..5d9b08a6626 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py
@@ -28,9 +28,9 @@ from typing import TYPE_CHECKING, Any, ClassVar
 from databricks.sql.utils import ParamEscaper
 
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
+from airflow.providers.databricks.version_compat import BaseOperator
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
index 9f714fa06d9..21000494402 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
@@ -26,14 +26,13 @@ from typing import TYPE_CHECKING, Any
 from mergedeep import merge
 
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
 from airflow.providers.databricks.hooks.databricks import DatabricksHook, 
RunLifeCycleState
 from airflow.providers.databricks.plugins.databricks_workflow import (
     WorkflowJobRepairAllFailedLink,
     WorkflowJobRunLink,
     store_databricks_job_run_link,
 )
-from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, 
BaseOperator
 from airflow.utils.task_group import TaskGroup
 
 if TYPE_CHECKING:
diff --git 
a/providers/databricks/src/airflow/providers/databricks/version_compat.py 
b/providers/databricks/src/airflow/providers/databricks/version_compat.py
index 48d122b6696..e7a259afb35 100644
--- a/providers/databricks/src/airflow/providers/databricks/version_compat.py
+++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py
@@ -33,3 +33,13 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import BaseOperator
+else:
+    from airflow.models import BaseOperator
+
+__all__ = [
+    "AIRFLOW_V_3_0_PLUS",
+    "BaseOperator",
+]
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 1a77ad1d0ca..882cd9073c0 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -24,8 +24,6 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
 from airflow.configuration import conf
-from airflow.models import BaseOperator
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.providers.dbt.cloud.hooks.dbt import (
     DbtCloudHook,
     DbtCloudJobRunException,
@@ -34,18 +32,16 @@ from airflow.providers.dbt.cloud.hooks.dbt import (
 )
 from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
 from airflow.providers.dbt.cloud.utils.openlineage import 
generate_openlineage_events_from_dbt_cloud_run
+from airflow.providers.dbt.cloud.version_compat import (
+    BaseOperator,
+    BaseOperatorLink,
+    XCom,
+)
 
 if TYPE_CHECKING:
     from airflow.providers.openlineage.extractors import OperatorLineage
     from airflow.utils.context import Context
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseOperatorLink
-    from airflow.sdk.execution_time.xcom import XCom
-else:
-    from airflow.models import XCom  # type: ignore[no-redef]
-    from airflow.models.baseoperatorlink import BaseOperatorLink  # type: 
ignore[no-redef]
-
 
 class DbtCloudRunJobOperatorLink(BaseOperatorLink):
     """Allows users to monitor the triggered job run directly in dbt Cloud."""
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
index 48d122b6696..940fa8385a5 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
@@ -33,3 +33,19 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import BaseOperator, BaseOperatorLink, BaseSensorOperator
+    from airflow.sdk.execution_time.xcom import XCom
+else:
+    from airflow.models import BaseOperator, XCom
+    from airflow.models.baseoperatorlink import BaseOperatorLink  # type: 
ignore[no-redef]
+    from airflow.sensors.base import BaseSensorOperator  # type: 
ignore[no-redef]
+
+__all__ = [
+    "AIRFLOW_V_3_0_PLUS",
+    "BaseOperator",
+    "BaseSensorOperator",
+    "BaseOperatorLink",
+    "XCom",
+]

Reply via email to