fedemgp opened a new issue, #63760:
URL: https://github.com/apache/airflow/issues/63760

   ### Apache Airflow Provider(s)
   
   databricks
   
   ### Versions of Apache Airflow Providers
   
   7.9.1
   
   ### Apache Airflow version
   
   3.0.x
   
   ### Operating System
   
   macOS Tahoe 26.3.1
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   We are in a current migration from MWAA Airflow 2.10.3 to Airflow 3.0.6. The 
deployment is being done using MWAA template, the only special thing to mention 
is the modified constraint file and the requirements.txt added here 
   
   
[constraints-dev.txt](https://github.com/user-attachments/files/26037955/constraints-dev.txt)
   
[requirements-dev.txt](https://github.com/user-attachments/files/26037956/requirements-dev.txt)
   
   ### What happened
   
    When running a DatabricksTaskOperator (or any operator that creates a 
DatabricksExecutionTrigger) with deferrable=True in Airflow 3, the trigger 
fails immediately with:
   
   ```
     RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop
     - just await the async function directly.
   ```
   
   The entire stacktrace can be seen below:
   
   ```
   [2026-03-16, 17:35:47] ERROR - Trigger failed:
   Traceback (most recent call last):
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 963, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 1072, in run_trigger
       async for event in trigger.run():
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/triggers/databricks.py",
 line 90, in run
       run_state = await self.hook.a_get_run_state(self.run_id)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks.py",
 line 533, in a_get_run_state
       response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 744, in _a_do_api_call
       url = self._endpoint_url(full_endpoint)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 654, in _endpoint_url
       port = f":{self.databricks_conn.port}" if self.databricks_conn.port else 
""
                                                 ^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/functools.py", line 998, in __get__
       val = self.func(instance)
             ^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 145, in databricks_conn
       return self.get_connection(self.databricks_conn_id)  # type: 
ignore[return-value]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/hooks/base.py", 
line 64, in get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py",
 line 478, in get_connection_from_secrets
       conn = TaskSDKConnection.get(conn_id=conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py",
 line 144, in get
       return _get_connection(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py",
 line 160, in _get_connection
       msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 740, in send
       return async_to_sync(self.asend)(msg)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 
186, in __call__
       raise RuntimeError(
   
   RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop - just await the async function directly.
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Prerequisites: Airflow 3.0.x with a working Databricks connection 
(databricks_default).
   
     # dags/test_deferrable_databricks.py
     from datetime import datetime, timezone
     from airflow import DAG
     from airflow.providers.databricks.operators.databricks import 
DatabricksTaskOperator
   
     with DAG(
         dag_id="test_deferrable_databricks",
         start_date=datetime(2024, 1, 1, tzinfo=timezone.utc),
         schedule=None,
         catchup=False,
     ):
         DatabricksTaskOperator(
             task_id="test_task",
             databricks_conn_id="databricks_default",
             deferrable=True,  # <-- triggers the bug
             task_config={
                 "spark_python_task": {
                     "python_file": "dbfs:/some/script.py",
                 },
             },
             new_cluster={
                 "spark_version": "13.3.x-scala2.12",
                 "node_type_id": "i3.xlarge",
                 "num_workers": 1,
             },
         )
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to