praveenkumarcd opened a new issue, #61122:
URL: https://github.com/apache/airflow/issues/61122

   ### Apache Airflow Provider(s)
   
   databricks
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-databricks==7.7.1
   
   ### Apache Airflow version
   
   3.0.6
   
   ### Operating System
   
   mac os 
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   we are using aws mwaa 3.0.6 version. we are using the providers version 
according to constraints
   
   ### What happened
   
   in dag we mentioned
   
   task=DatabricksSubmitRunOperator.partial(
                           task_id = JOB_TASK_ID,
                           databricks_conn_id="dev_databricks_conn_sample",
                           new_cluster=CLUSTER,
                           libraries =LIBRARIES,
                           deferrable=True,
                           dag=dag,
                           access_control_list=ACL
                       ).expand_kwargs(kwargs=fn_get_market_list())
   
   
   the error we are getting in airflow logs is
   
   ERROR - Trigger failed:
   Traceback (most recent call last):
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 963, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 1072, in run_trigger
       async for event in trigger.run():
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/triggers/databricks.py",
 line 90, in run
       run_state = await self.hook.a_get_run_state(self.run_id)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks.py",
 line 514, in a_get_run_state
       response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 713, in _a_do_api_call
       url = self._endpoint_url(full_endpoint)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 623, in _endpoint_url
       port = f":{self.databricks_conn.port}" if self.databricks_conn.port else 
""
                                                 ^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.12/functools.py", line 998, in __get__
       val = self.func(instance)
             ^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 142, in databricks_conn
       return self.get_connection(self.databricks_conn_id)  # type: 
ignore[return-value]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/hooks/base.py", 
line 64, in get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py",
 line 478, in get_connection_from_secrets
       conn = TaskSDKConnection.get(conn_id=conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py",
 line 144, in get
       return _get_connection(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py",
 line 160, in _get_connection
       msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 740, in send
       return async_to_sync(self.asend)(msg)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 
186, in __call__
       raise RuntimeError(
   
   RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop - just await the async function directly.
   
   ### What you think should happen instead
   
   it should defer the dag instead failing?
   
   ### How to reproduce
   
   any fix?
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to