vatsrahul1001 opened a new issue, #57145:
URL: https://github.com/apache/airflow/issues/57145

   ### Apache Airflow version
   
   3.1.0
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I’m getting a “connection not found” error with the async DAG in 3.1.1rc1 
when using a UI-created connection, though it works fine with deferrable=False 
or when the connection is set via environment variables
   
   **Error logs**
   
   ```
   [2025-10-23 14:49:13] ERROR - Trigger 
example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1 (ID 
1) exited with error The conn_id `databricks_default` isn't defined 
loc=triggerer_job_runner.py:1001
   AirflowNotFoundException: The conn_id `databricks_default` isn't defined
   
   File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", 
line 992 in cleanup_finished_triggers
   
   File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 116 
in greenback_shim
   
   File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 201 
in _greenback_shim
   
   File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 81 
in trampoline
   
   File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line 185 
in send
   
   File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", 
line 1106 in run_trigger
   
   File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py",
 line 90 in run
   
   File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py",
 line 524 in a_get_run_state
   
   File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
 line 713 in _a_do_api_call
   
   File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
 line 623 in _endpoint_url
   
   File "/usr/python/lib/python3.10/functools.py", line 981 in __get__
   
   File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
 line 142 in databricks_conn
   
   File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61 in 
get_connection
   
   File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 
226 in get
   
   File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 
172 in _get_connection
   [2025-10-23 14:49:13] ERROR - Trigger exited without sending an event. 
Dependent tasks will be failed. 
name=example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1 
(ID 1) loc=triggerer_job_runner.py:1016
   [2025-10-23 14:49:14] INFO - DAG bundles loaded: dags-folder 
source=airflow.dag_processing.bundles.manager.DagBundlesManager 
loc=manager.py:179
   [2025-10-23 14:49:14] INFO - Filling up the DagBag from 
/files/dags/example_databricks.py source=airflow.models.dagbag.DagBag 
loc=dagbag.py:593
   [2025-10-23 14:49:14] WARNING - The `airflow.utils.timezone.datetime` 
attribute is deprecated. Please use `'airflow.sdk.timezone.datetime'`. 
category=DeprecatedImportWarning source=py.warnings 
loc=/files/dags/example_databricks.py:7
   [2025-10-23 14:49:14] ERROR - Trigger failed:
   Traceback (most recent call last):
   
     File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", 
line 992, in cleanup_finished_triggers
       result = details["task"].result()
   
     File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 
116, in greenback_shim
       return await _greenback_shim(orig_coro, next_send)  # type: ignore
   
     File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 
201, in _greenback_shim
       next_yield, resume_greenlet = resume_greenlet.switch(next_send)
   
     File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 
81, in trampoline
       next_yield: Any = next_send.send(orig_coro)  # type: ignore
   
     File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line 
185, in send
       return gen.send(self.value)
   
     File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", 
line 1106, in run_trigger
       async for event in trigger.run():
   
     File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py",
 line 90, in run
       run_state = await self.hook.a_get_run_state(self.run_id)
   
     File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py",
 line 524, in a_get_run_state
       response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
   
     File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
 line 713, in _a_do_api_call
       url = self._endpoint_url(full_endpoint)
   
     File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
 line 623, in _endpoint_url
       port = f":{self.databricks_conn.port}" if self.databricks_conn.port else 
""
   
     File "/usr/python/lib/python3.10/functools.py", line 981, in __get__
       val = self.func(instance)
   
     File 
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
 line 142, in databricks_conn
       return self.get_connection(self.databricks_conn_id)  # type: 
ignore[return-value]
   
     File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61, in 
get_connection
       conn = Connection.get(conn_id)
   
     File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", 
line 226, in get
       return _get_connection(conn_id)
   
     File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", 
line 172, in _get_connection
       raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
   
   airflow.exceptions.AirflowNotFoundException: The conn_id 
`databricks_default` isn't defined
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Create databricks_default connection from UI.
   2. Execute below DAG
   
   ```
   import json
   import os
   from datetime import timedelta
   from typing import Dict, Optional
   
   from airflow.models.dag import DAG
   from airflow.utils.timezone import datetime
   
   from airflow.providers.databricks.operators.databricks import 
DatabricksSubmitRunOperator, DatabricksRunNowOperator
   
   DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", 
"databricks_default")
   # Notebook path as a Json object
   notebook_task = '{"notebook_path": "/Shared/Notebook_1"}'
   NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", 
notebook_task))
   notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
   EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
   
   default_args = {
       "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
       "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
       "retry_delay": 
timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
   }
   
   new_cluster = {
       "num_workers": 1,
       "spark_version": "10.4.x-scala2.12",
       "spark_conf": {},
       "azure_attributes": {
           "availability": "ON_DEMAND_AZURE",
           "spot_bid_max_price": -1,
       },
       "node_type_id": "Standard_D3_v2",
       "ssh_public_keys": [],
       "custom_tags": {},
       "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
       "cluster_source": "JOB",
       "init_scripts": [],
   }
   
   
   with DAG(
       dag_id="example_async_databricks",
       start_date=datetime(2022, 1, 1),
       schedule=None,
       catchup=False,
       default_args=default_args,
       tags=["example", "async", "databricks"],
   ) as dag:
       # [START howto_operator_databricks_submit_run_async]
       opr_submit_run = DatabricksSubmitRunOperator(
           task_id="submit_run",
           databricks_conn_id=DATABRICKS_CONN_ID,
           new_cluster=new_cluster,
           notebook_task=NOTEBOOK_TASK,
           do_xcom_push=True,
           deferrable=True,
       )
       # [END howto_operator_databricks_submit_run_async]
   
       # [START howto_operator_databricks_run_now_async]
       opr_run_now = DatabricksRunNowOperator(
           task_id="run_now",
           databricks_conn_id=DATABRICKS_CONN_ID,
           job_id="{{ task_instance.xcom_pull(task_ids='submit_run', 
dag_id='example_async_databricks', key='job_id') }}",
           notebook_params=notebook_params,
           deferrable=True
       )
       # [END howto_operator_databricks_run_now_async]
   
   opr_submit_run >> opr_run_now
   ```
   
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to