bob-skowron opened a new pull request, #63611:
URL: https://github.com/apache/airflow/pull/63611

   This is the same implementation as #55568 , but since that one seems to be 
having difficulty being closed. Happy to continue this route. If the 
maintainers prefer to close this in favor of the original, I understand.
   
   I bumped into this error when running the DatabricksSubmitRunOperator on 
Airflow 3.0.6 using apache-airflow-providers-databricks==7.7.1:
   
   ERROR - Trigger failed:
   Traceback (most recent call last):
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 963, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 1072, in run_trigger
       async for event in trigger.run():
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/triggers/databricks.py",
 line 90, in run
       run_state = await self.hook.a_get_run_state(self.run_id)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks.py",
 line 514, in a_get_run_state
       response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 713, in _a_do_api_call
       url = self._endpoint_url(full_endpoint)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 623, in _endpoint_url
       port = f":{self.databricks_conn.port}" if self.databricks_conn.port else 
""
                                                 ^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/functools.py", line 998, in __get__
       val = self.func(instance)
             ^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 142, in databricks_conn
       return self.get_connection(self.databricks_conn_id)  # type: 
ignore[return-value]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/site-packages/airflow/hooks/base.py", line 
64, in get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/models/connection.py", line 
478, in get_connection_from_secrets
       conn = TaskSDKConnection.get(conn_id=conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py",
 line 144, in get
       return _get_connection(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py",
 line 160, in _get_connection
       msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 740, in send
       return async_to_sync(self.asend)(msg)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/site-packages/asgiref/sync.py", line 187, 
in __call__
       raise RuntimeError(
   
   RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop - just await the async function directly.
   : 
source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator"
   [2025-09-11, 15:56:36] ERROR - Task failed with exception: source="task"
   TaskDeferralError: Trigger failure
   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 920 in run
   
   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1215 in _execute_task
   
   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 
1603 in resume_execution
   
   Searching for the key message RuntimeError: You cannot use AsyncToSync in 
the same thread as an async event loop - just await the async function 
directly. led me to several related issues/PRs:
   
       https://github.com/apache/airflow/issues/54350
       https://github.com/apache/airflow/issues/53447
       https://github.com/apache/airflow/issues/55632
       https://github.com/apache/airflow/pull/55094
       https://github.com/apache/airflow/pull/55179
       https://github.com/apache/airflow/pull/54598
   
   I didn't test the exact version in which deferrable mode on the 
DatabricksSubmitRunOperator broke, but I believe it's Airflow 3.0.3.
   
   This PR adds an async version of the databricks_conn method and changes all 
async methods to use this new a_databricks_conn method for fetching the 
connection.
   
   Tested by fixing all tests. I don't have a real Databricks instance to test 
against, but also tested this locally by monkeypatching several calls in the 
DatabricksHook and BaseDatabricksHook to the point where the AsyncToSync error 
was reached, then applied the changes from this PR, and a different error was 
reached because I don't have connectivity to a real Databricks instance.
   
   Also: mypy was complaining about several usernames/passwords being None 
where a string was expected. I learned that an empty username/password is valid 
according to [RFC 
2617](https://www.ietf.org/rfc/rfc2617.txt#:~:text=user%2Dpass%20%20%20%3D%20userid%20%22%3A%22%20password%0A%20%20%20%20%20%20userid%20%20%20%20%20%20%3D%20*%3CTEXT%20excluding%20%22%3A%22%3E%0A%20%20%20%20%20%20password%20%20%20%20%3D%20*TEXT),
 so decided to default to "" in case it's None.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to