fedemgp opened a new issue, #63760: URL: https://github.com/apache/airflow/issues/63760
### Apache Airflow Provider(s) databricks ### Versions of Apache Airflow Providers 7.9.1 ### Apache Airflow version 3.0.x ### Operating System macOS Tahoe 26.3.1 ### Deployment Amazon (AWS) MWAA ### Deployment details We are in a current migration from MWAA Airflow 2.10.3 to Airflow 3.0.6. The deployment is being done using MWAA template, the only special thing to mention is the modified constraint file and the requirements.txt added here [constraints-dev.txt](https://github.com/user-attachments/files/26037955/constraints-dev.txt) [requirements-dev.txt](https://github.com/user-attachments/files/26037956/requirements-dev.txt) ### What happened When running a DatabricksTaskOperator (or any operator that creates a DatabricksExecutionTrigger) with deferrable=True in Airflow 3, the trigger fails immediately with: ``` RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. ``` The entire stacktrace can be seen below: ``` [2026-03-16, 17:35:47] ERROR - Trigger failed: Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 963, in cleanup_finished_triggers result = details["task"].result() ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 1072, in run_trigger async for event in trigger.run(): File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/triggers/databricks.py", line 90, in run run_state = await self.hook.a_get_run_state(self.run_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks.py", line 533, in a_get_run_state response = await self._a_do_api_call(GET_RUN_ENDPOINT, json) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 744, in _a_do_api_call url = self._endpoint_url(full_endpoint) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 654, in _endpoint_url port = f":{self.databricks_conn.port}" if self.databricks_conn.port else "" ^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/functools.py", line 998, in __get__ val = self.func(instance) ^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 145, in databricks_conn return self.get_connection(self.databricks_conn_id) # type: ignore[return-value] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/hooks/base.py", line 64, in get_connection conn = Connection.get_connection_from_secrets(conn_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py", line 478, in get_connection_from_secrets conn = TaskSDKConnection.get(conn_id=conn_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py", line 144, in get return _get_connection(conn_id) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py", line 160, in _get_connection msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 740, in send return async_to_sync(self.asend)(msg) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 186, in __call__ raise RuntimeError( RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. ``` ### What you think should happen instead _No response_ ### How to reproduce Prerequisites: Airflow 3.0.x with a working Databricks connection (databricks_default). # dags/test_deferrable_databricks.py from datetime import datetime, timezone from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksTaskOperator with DAG( dag_id="test_deferrable_databricks", start_date=datetime(2024, 1, 1, tzinfo=timezone.utc), schedule=None, catchup=False, ): DatabricksTaskOperator( task_id="test_task", databricks_conn_id="databricks_default", deferrable=True, # <-- triggers the bug task_config={ "spark_python_task": { "python_file": "dbfs:/some/script.py", }, }, new_cluster={ "spark_version": "13.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 1, }, ) ### Anything else _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
