BasPH opened a new pull request, #55568:
URL: https://github.com/apache/airflow/pull/55568

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   I bumped into this error when running the DatabricksSubmitRunOperator on 
Airflow 3.0.6 using apache-airflow-providers-databricks==7.7.1:
   ```
   ERROR - Trigger failed:
   Traceback (most recent call last):
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 963, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 1072, in run_trigger
       async for event in trigger.run():
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/triggers/databricks.py",
 line 90, in run
       run_state = await self.hook.a_get_run_state(self.run_id)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks.py",
 line 514, in a_get_run_state
       response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 713, in _a_do_api_call
       url = self._endpoint_url(full_endpoint)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 623, in _endpoint_url
       port = f":{self.databricks_conn.port}" if self.databricks_conn.port else 
""
                                                 ^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/functools.py", line 998, in __get__
       val = self.func(instance)
             ^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 142, in databricks_conn
       return self.get_connection(self.databricks_conn_id)  # type: 
ignore[return-value]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/site-packages/airflow/hooks/base.py", line 
64, in get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/models/connection.py", line 
478, in get_connection_from_secrets
       conn = TaskSDKConnection.get(conn_id=conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py",
 line 144, in get
       return _get_connection(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py",
 line 160, in _get_connection
       msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 740, in send
       return async_to_sync(self.asend)(msg)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/site-packages/asgiref/sync.py", line 187, 
in __call__
       raise RuntimeError(
   
   RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop - just await the async function directly.
   : 
source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator"
   [2025-09-11, 15:56:36] ERROR - Task failed with exception: source="task"
   TaskDeferralError: Trigger failure
   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 920 in run
   
   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1215 in _execute_task
   
   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 
1603 in resume_execution
   ```
   
   Searching for the key message `RuntimeError: You cannot use AsyncToSync in 
the same thread as an async event loop - just await the async function 
directly.` led me to several related issues/PRs:
   
   - https://github.com/apache/airflow/issues/54350
   - https://github.com/apache/airflow/issues/53447
   - https://github.com/apache/airflow/issues/55632
   - https://github.com/apache/airflow/pull/55094
   - https://github.com/apache/airflow/pull/55179
   - https://github.com/apache/airflow/pull/54598
   
   I didn't test the exact version in which deferrable mode on the 
DatabricksSubmitRunOperator broke, but I believe it's Airflow 3.0.3.
   
   This PR adds an async version of the `databricks_conn` method and changes 
all async methods to use this new `a_databricks_conn` method for fetching the 
connection.
   
   Tested by fixing all tests. I don't have a real Databricks instance to test 
against, but also tested this locally by monkeypatching several calls in the 
DatabricksHook and BaseDatabricksHook to the point where the AsyncToSync error 
was reached, then applied the changes from this PR, and a different error was 
reached because I don't have connectivity to a real Databricks instance.
   
   Also: mypy was complaining about several usernames/passwords being None 
where a string was expected. I learned that an empty username/password is valid 
according to [RFC 
2617](https://www.ietf.org/rfc/rfc2617.txt#:~:text=user%2Dpass%20%20%20%3D%20userid%20%22%3A%22%20password%0A%20%20%20%20%20%20userid%20%20%20%20%20%20%3D%20*%3CTEXT%20excluding%20%22%3A%22%3E%0A%20%20%20%20%20%20password%20%20%20%20%3D%20*TEXT),
 so decided to default to `""` in case it's `None`.
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to