This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4588de0efd6 Lock in Databricks workflow depends_on parent-key behavior
(#47614) (#66681)
4588de0efd6 is described below
commit 4588de0efd625055d62760c527b9c9facd9a95e8
Author: deepinsight coder <[email protected]>
AuthorDate: Tue Jun 2 11:24:17 2026 -0700
Lock in Databricks workflow depends_on parent-key behavior (#47614) (#66681)
* Lock in DatabricksTaskBaseOperator depends_on parent-key behavior
The runtime fix for issue #47614 shipped in PR #48492; this PR adds
end-to-end regression coverage so the bug cannot silently regress, plus
small type-hint and constructor-clarity follow-ups in the same area.
Tests build a real DAG + DatabricksWorkflowTaskGroup with
DatabricksNotebookOperator tasks and assert depends_on payloads for the
default-key, custom-key, >100-char-key, diamond, fan-out, root-task, and
external-upstream cases. Also fixes the existing
test_convert_to_databricks_workflow_task to pass strings (not mocks) so
the depends_on branch is actually exercised, and adds a one-line check
that _generate_databricks_task_key raises when called with a parent
task_id but no task_dict.
closes: #47614
* Remove manually added databricks changelog entry
Provider changelogs are regenerated from git log by the release
manager and should not be edited by hand.
* Assert depends_on reaches the Databricks Jobs API payload
Existing TestWorkflowDependsOn coverage verified the in-process
create_workflow_json() output. The new TestWorkflowDependsOnWirePayload
class drives _CreateDatabricksWorkflowOperator._create_or_reset_job end
to end and asserts on the spec that DatabricksHook.create_job /
DatabricksHook.reset_job actually receive, i.e. the payload that lands
in /api/2.1/jobs/create and /api/2.1/jobs/reset.
Both branches (no existing job -> create_job, existing job -> reset_job)
are exercised; both assert tasks[child].depends_on == [{task_key:
md5(parent)}] and tasks[parent].depends_on == [].
* Fix Databricks provider mypy timezone imports
* Fix Databricks provider timezone import compatibility
---
.../providers/databricks/operators/databricks.py | 4 +-
.../databricks/operators/databricks_workflow.py | 2 +-
.../providers/databricks/utils/openlineage.py | 2 +-
.../unit/databricks/operators/test_databricks.py | 26 ++-
.../operators/test_databricks_workflow.py | 242 ++++++++++++++++++++-
.../sensors/test_databricks_partition.py | 3 +-
.../unit/databricks/sensors/test_databricks_sql.py | 3 +-
.../unit/databricks/utils/test_openlineage.py | 3 +-
8 files changed, 270 insertions(+), 15 deletions(-)
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index c9e7c6891fb..9898993d414 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -1425,7 +1425,7 @@ class DatabricksTaskBaseOperator(BaseOperator, ABC):
def _convert_to_databricks_workflow_task(
self,
- relevant_upstreams: list[BaseOperator],
+ relevant_upstreams: list[str],
task_dict: dict[str, BaseOperator],
context: Context | None = None,
) -> dict[str, object]:
@@ -1679,7 +1679,7 @@ class
DatabricksNotebookOperator(DatabricksTaskBaseOperator):
def _convert_to_databricks_workflow_task(
self,
- relevant_upstreams: list[BaseOperator],
+ relevant_upstreams: list[str],
task_dict: dict[str, BaseOperator],
context: Context | None = None,
) -> dict[str, object]:
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
index 5bbf9d3c78a..779c2fc9f15 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
@@ -158,7 +158,7 @@ class _CreateDatabricksWorkflowOperator(BaseOperator):
self.python_params = python_params or []
self.spark_submit_params = spark_submit_params or []
self.tasks_to_convert = tasks_to_convert or {}
- self.relevant_upstreams = [task_id]
+ self.relevant_upstreams: list[str] = []
self.workflow_run_metadata: WorkflowRunMetadata | None = None
super().__init__(task_id=task_id, **kwargs)
diff --git
a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
index 56f4400df61..58e87a21dd7 100644
--- a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
+++ b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
@@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any
import requests
from airflow.providers.common.compat.openlineage.check import
require_openlineage_version
-from airflow.utils import timezone
+from airflow.providers.common.compat.sdk import timezone
if TYPE_CHECKING:
from openlineage.client.event_v2 import RunEvent
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index a1b7b4f11b3..4684b14282c 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -2778,15 +2778,22 @@ class TestDatabricksNotebookOperator:
operator.task_group = databricks_workflow_task_group
operator.task_id = "test_task"
operator.upstream_task_ids = ["upstream_task"]
- relevant_upstreams = [MagicMock(task_id="upstream_task")]
- task_dict = {"upstream_task": MagicMock(task_id="upstream_task")}
+ upstream_task = DatabricksNotebookOperator(
+ notebook_path="/path/to/upstream",
+ source="WORKSPACE",
+ task_id="upstream_task",
+ dag=dag,
+ )
+ relevant_upstreams = ["upstream_task"]
+ task_dict = {"upstream_task": upstream_task}
task_json =
operator._convert_to_databricks_workflow_task(relevant_upstreams, task_dict)
task_key = hashlib.md5(b"example_dag__test_task").hexdigest()
+ upstream_task_key =
hashlib.md5(b"example_dag__upstream_task").hexdigest()
expected_json = {
"task_key": task_key,
- "depends_on": [],
+ "depends_on": [{"task_key": upstream_task_key}],
"timeout_seconds": 0,
"email_notifications": {},
"notebook_task": {
@@ -2957,6 +2964,19 @@ class TestDatabricksTaskOperator:
expected_task_key = hashlib.md5(task_key).hexdigest()
assert expected_task_key == operator.databricks_task_key
+ def
test_generate_databricks_task_key_requires_task_dict_when_task_id_passed(self):
+ """Looking up a parent task's key without a ``task_dict`` is a
programmer error."""
+ operator = DatabricksTaskOperator(
+ task_id="test_task",
+ databricks_conn_id="test_conn_id",
+ task_config={},
+ )
+ with pytest.raises(
+ ValueError,
+ match="Must pass task_dict if task_id is provided in
_generate_databricks_task_key.",
+ ):
+ operator._generate_databricks_task_key(task_id="upstream_task")
+
def test_user_databricks_task_key(self):
task_config = {}
operator = DatabricksTaskOperator(
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
index 9cfa7e91ae3..84069ee0ff7 100644
---
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
+++
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+import hashlib
from unittest.mock import MagicMock, patch
import pytest
@@ -28,8 +29,9 @@ pytest.importorskip("flask_session")
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, timezone
from airflow.providers.databricks.hooks.databricks import RunLifeCycleState
+from airflow.providers.databricks.operators.databricks import
DatabricksNotebookOperator
from airflow.providers.databricks.operators.databricks_workflow import (
DatabricksWorkflowTaskGroup,
WorkflowRunMetadata,
@@ -37,7 +39,6 @@ from
airflow.providers.databricks.operators.databricks_workflow import (
_flatten_node,
)
from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.utils import timezone
DEFAULT_DATE = timezone.datetime(2021, 1, 1)
@@ -333,3 +334,240 @@ def test_on_kill(mock_databricks_hook, context,
mock_workflow_run_metadata):
operator.on_kill()
operator._hook.cancel_run.assert_called_once_with(RUN_ID)
+
+
+class TestWorkflowDependsOn:
+ """End-to-end coverage that ``depends_on`` references the *parent's*
``task_key``.
+
+ Regression coverage for issue apache/airflow#47614 (root cause fixed by
#48492).
+ Each test builds a real ``DAG`` + ``DatabricksWorkflowTaskGroup``
populated with
+ real ``DatabricksNotebookOperator`` tasks (no operator mocks), then drives
+ ``_CreateDatabricksWorkflowOperator.create_workflow_json`` and asserts the
+ resulting ``tasks[*]['depends_on']`` payload.
+ """
+
+ DAG_ID = "test_depends_on_dag"
+ GROUP_ID = "wf_group"
+ CONN_ID = "databricks_conn"
+
+ @staticmethod
+ def _build_notebook(task_id: str, **kwargs) -> DatabricksNotebookOperator:
+ return DatabricksNotebookOperator(
+ task_id=task_id,
+ notebook_path=f"/path/{task_id}",
+ source="WORKSPACE",
+ **kwargs,
+ )
+
+ def _expected_default_key(self, group_task_id: str) -> str:
+ full_task_id = f"{self.GROUP_ID}.{group_task_id}"
+ return
hashlib.md5(f"{self.DAG_ID}__{full_task_id}".encode()).hexdigest()
+
+ def _launch_task(self, dag: DAG) -> _CreateDatabricksWorkflowOperator:
+ launch = dag.task_dict[f"{self.GROUP_ID}.launch"]
+ assert isinstance(launch, _CreateDatabricksWorkflowOperator)
+ return launch
+
+ @staticmethod
+ def _tasks_by_key(workflow_json: dict) -> dict:
+ return {t["task_key"]: t for t in workflow_json["tasks"]}
+
+ def test_depends_on_uses_parent_key_default_keys(self):
+ """``task_A >> task_B`` — ``task_B.depends_on`` references
``task_A``'s key."""
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ task_a = self._build_notebook("task_a")
+ task_b = self._build_notebook("task_b")
+ task_a >> task_b
+
+ tasks_by_key =
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+ a_key = self._expected_default_key("task_a")
+ b_key = self._expected_default_key("task_b")
+
+ assert set(tasks_by_key) == {a_key, b_key}
+ assert tasks_by_key[a_key]["depends_on"] == []
+ assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+
+ def test_depends_on_uses_parent_key_custom_parent_key(self):
+ """An explicit ``databricks_task_key`` on the parent flows into
``depends_on``."""
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ task_a = self._build_notebook("task_a",
databricks_task_key="custom_a")
+ task_b = self._build_notebook("task_b")
+ task_a >> task_b
+
+ tasks_by_key =
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+ b_key = self._expected_default_key("task_b")
+
+ assert "custom_a" in tasks_by_key
+ assert tasks_by_key[b_key]["depends_on"] == [{"task_key": "custom_a"}]
+
+ def test_depends_on_falls_back_to_hash_when_parent_key_too_long(self):
+ """A >100-char explicit key is rejected; both task and ``depends_on``
use the hash."""
+ too_long_key = "x" * 101
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ task_a = self._build_notebook("task_a",
databricks_task_key=too_long_key)
+ task_b = self._build_notebook("task_b")
+ task_a >> task_b
+
+ tasks_by_key =
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+ a_key = self._expected_default_key("task_a")
+ b_key = self._expected_default_key("task_b")
+
+ assert too_long_key not in tasks_by_key
+ assert a_key in tasks_by_key
+ assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+
+ def test_depends_on_diamond_dependency(self):
+ """``A >> [B, C] >> D`` — D depends on both B and C; B and C each
depend only on A."""
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ task_a = self._build_notebook("task_a")
+ task_b = self._build_notebook("task_b")
+ task_c = self._build_notebook("task_c")
+ task_d = self._build_notebook("task_d")
+ task_a >> [task_b, task_c] >> task_d
+
+ tasks_by_key =
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+ a_key = self._expected_default_key("task_a")
+ b_key = self._expected_default_key("task_b")
+ c_key = self._expected_default_key("task_c")
+ d_key = self._expected_default_key("task_d")
+
+ assert tasks_by_key[a_key]["depends_on"] == []
+ assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+ assert tasks_by_key[c_key]["depends_on"] == [{"task_key": a_key}]
+ d_parent_keys = {entry["task_key"] for entry in
tasks_by_key[d_key]["depends_on"]}
+ assert d_parent_keys == {b_key, c_key}
+
+ def test_depends_on_fan_out_dependency(self):
+ """``A >> [B, C]`` — both downstreams reference A's key only."""
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ task_a = self._build_notebook("task_a")
+ task_b = self._build_notebook("task_b")
+ task_c = self._build_notebook("task_c")
+ task_a >> [task_b, task_c]
+
+ tasks_by_key =
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+ a_key = self._expected_default_key("task_a")
+ b_key = self._expected_default_key("task_b")
+ c_key = self._expected_default_key("task_c")
+
+ assert tasks_by_key[a_key]["depends_on"] == []
+ assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+ assert tasks_by_key[c_key]["depends_on"] == [{"task_key": a_key}]
+
+ def test_root_tasks_have_empty_depends_on(self):
+ """Root tasks' Airflow upstream is the launch task; that must never
appear in ``depends_on``."""
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ root_a = self._build_notebook("root_a")
+ root_b = self._build_notebook("root_b")
+ self._build_notebook("downstream").set_upstream([root_a,
root_b])
+
+ launch_task = self._launch_task(dag)
+ # Sanity: both roots actually have the launch task as an Airflow
upstream.
+ for root_task_id in (f"{self.GROUP_ID}.root_a",
f"{self.GROUP_ID}.root_b"):
+ assert launch_task.task_id in
dag.task_dict[root_task_id].upstream_task_ids
+
+ tasks_by_key = self._tasks_by_key(launch_task.create_workflow_json())
+ root_a_key = self._expected_default_key("root_a")
+ root_b_key = self._expected_default_key("root_b")
+
+ assert tasks_by_key[root_a_key]["depends_on"] == []
+ assert tasks_by_key[root_b_key]["depends_on"] == []
+
+ def test_depends_on_filters_out_external_upstream(self):
+ """An Airflow upstream outside the workflow group must not appear in
``depends_on``."""
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ external_op = EmptyOperator(task_id="external_op")
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ dbx_task = self._build_notebook("dbx_task")
+ external_op >> dbx_task
+
+ tasks_by_key =
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+ dbx_key = self._expected_default_key("dbx_task")
+
+ assert tasks_by_key[dbx_key]["depends_on"] == []
+
+
+class TestWorkflowDependsOnWirePayload:
+ """Wire-boundary coverage: the spec sent to the Databricks Jobs API
carries ``depends_on``.
+
+ :class:`TestWorkflowDependsOn` asserts the in-process
``create_workflow_json`` payload.
+ These tests assert the *wire* payload — what ``_create_or_reset_job``
actually hands to
+ ``DatabricksHook.create_job`` (new job) or ``DatabricksHook.reset_job``
(existing job),
+ which is what the Databricks REST API receives.
+ """
+
+ DAG_ID = "test_depends_on_wire_dag"
+ GROUP_ID = "wf_group"
+ CONN_ID = "databricks_conn"
+
+ @staticmethod
+ def _build_notebook(task_id: str, **kwargs) -> DatabricksNotebookOperator:
+ return DatabricksNotebookOperator(
+ task_id=task_id,
+ notebook_path=f"/path/{task_id}",
+ source="WORKSPACE",
+ **kwargs,
+ )
+
+ def _expected_default_key(self, group_task_id: str) -> str:
+ full_task_id = f"{self.GROUP_ID}.{group_task_id}"
+ return
hashlib.md5(f"{self.DAG_ID}__{full_task_id}".encode()).hexdigest()
+
+ def _launch_task(self, dag: DAG) -> _CreateDatabricksWorkflowOperator:
+ launch = dag.task_dict[f"{self.GROUP_ID}.launch"]
+ assert isinstance(launch, _CreateDatabricksWorkflowOperator)
+ return launch
+
+ @staticmethod
+ def _tasks_by_key(workflow_json: dict) -> dict:
+ return {t["task_key"]: t for t in workflow_json["tasks"]}
+
+ def _build_two_task_dag(self) -> DAG:
+ with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE)
as dag:
+ with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID,
databricks_conn_id=self.CONN_ID):
+ task_a = self._build_notebook("task_a")
+ task_b = self._build_notebook("task_b")
+ task_a >> task_b
+ return dag
+
+ def _assert_parent_depends_on(self, job_spec: dict) -> None:
+ tasks_by_key = self._tasks_by_key(job_spec)
+ a_key = self._expected_default_key("task_a")
+ b_key = self._expected_default_key("task_b")
+
+ assert len(job_spec["tasks"]) == 2
+ assert set(tasks_by_key) == {a_key, b_key}
+ assert tasks_by_key[a_key]["depends_on"] == []
+ assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+
+ def test_create_job_payload_carries_parent_depends_on(self,
mock_databricks_hook):
+ """No existing job → ``create_job`` receives a spec whose
``depends_on`` references the parent key."""
+ launch_task = self._launch_task(self._build_two_task_dag())
+ launch_task._hook.list_jobs.return_value = []
+ launch_task._hook.create_job.return_value = 999
+
+ launch_task._create_or_reset_job(context=MagicMock())
+
+ launch_task._hook.create_job.assert_called_once()
+ launch_task._hook.reset_job.assert_not_called()
+ (job_spec,) = launch_task._hook.create_job.call_args.args
+ self._assert_parent_depends_on(job_spec)
+
+ def test_reset_job_payload_carries_parent_depends_on(self,
mock_databricks_hook):
+ """Existing job → ``reset_job`` receives a spec whose ``depends_on``
references the parent key."""
+ launch_task = self._launch_task(self._build_two_task_dag())
+ launch_task._hook.list_jobs.return_value = [{"job_id": 42}]
+
+ launch_task._create_or_reset_job(context=MagicMock())
+
+ launch_task._hook.reset_job.assert_called_once()
+ launch_task._hook.create_job.assert_not_called()
+ job_id, job_spec = launch_task._hook.reset_job.call_args.args
+ assert job_id == 42
+ self._assert_parent_depends_on(job_spec)
diff --git
a/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
b/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
index 473a0d073c3..23e6d12c310 100644
---
a/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
+++
b/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
@@ -24,10 +24,9 @@ from unittest.mock import patch
import pytest
from airflow.models import DAG
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, timezone
from airflow.providers.common.sql.hooks.handlers import fetch_all_handler
from airflow.providers.databricks.sensors.databricks_partition import
DatabricksPartitionSensor
-from airflow.utils import timezone
TASK_ID = "db-partition-sensor"
DEFAULT_CONN_ID = "databricks_default"
diff --git
a/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py
b/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py
index ce72e0b8316..3431722f770 100644
--- a/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py
+++ b/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py
@@ -24,9 +24,8 @@ from unittest.mock import patch
import pytest
from airflow.models import DAG
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, timezone
from airflow.providers.databricks.sensors.databricks_sql import
DatabricksSqlSensor
-from airflow.utils import timezone
TASK_ID = "db-sensor"
DEFAULT_CONN_ID = "databricks_default"
diff --git
a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
index 10efbb7fffc..8702456ca86 100644
--- a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
+++ b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
@@ -29,7 +29,7 @@ from airflow.providers.common.compat.openlineage.facet import
(
ExternalQueryRunFacet,
SQLJobFacet,
)
-from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException, timezone
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.providers.databricks.utils.openlineage import (
@@ -41,7 +41,6 @@ from airflow.providers.databricks.utils.openlineage import (
emit_openlineage_events_for_databricks_queries,
)
from airflow.providers.openlineage.conf import namespace
-from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState