vatsrahul1001 opened a new issue, #57145:
URL: https://github.com/apache/airflow/issues/57145
### Apache Airflow version
3.1.0
### If "Other Airflow 2/3 version" selected, which one?
_No response_
### What happened?
I’m getting a “connection not found” error with the async DAG in 3.1.1rc1
when using a UI-created connection, though it works fine with deferrable=False
or when the connection is set via environment variables
**Error logs**
```
[2025-10-23 14:49:13] ERROR - Trigger
example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1 (ID
1) exited with error The conn_id `databricks_default` isn't defined
loc=triggerer_job_runner.py:1001
AirflowNotFoundException: The conn_id `databricks_default` isn't defined
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py",
line 992 in cleanup_finished_triggers
File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 116
in greenback_shim
File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 201
in _greenback_shim
File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 81
in trampoline
File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line 185
in send
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py",
line 1106 in run_trigger
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py",
line 90 in run
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py",
line 524 in a_get_run_state
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
line 713 in _a_do_api_call
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
line 623 in _endpoint_url
File "/usr/python/lib/python3.10/functools.py", line 981 in __get__
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
line 142 in databricks_conn
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61 in
get_connection
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line
226 in get
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line
172 in _get_connection
[2025-10-23 14:49:13] ERROR - Trigger exited without sending an event.
Dependent tasks will be failed.
name=example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1
(ID 1) loc=triggerer_job_runner.py:1016
[2025-10-23 14:49:14] INFO - DAG bundles loaded: dags-folder
source=airflow.dag_processing.bundles.manager.DagBundlesManager
loc=manager.py:179
[2025-10-23 14:49:14] INFO - Filling up the DagBag from
/files/dags/example_databricks.py source=airflow.models.dagbag.DagBag
loc=dagbag.py:593
[2025-10-23 14:49:14] WARNING - The `airflow.utils.timezone.datetime`
attribute is deprecated. Please use `'airflow.sdk.timezone.datetime'`.
category=DeprecatedImportWarning source=py.warnings
loc=/files/dags/example_databricks.py:7
[2025-10-23 14:49:14] ERROR - Trigger failed:
Traceback (most recent call last):
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py",
line 992, in cleanup_finished_triggers
result = details["task"].result()
File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line
116, in greenback_shim
return await _greenback_shim(orig_coro, next_send) # type: ignore
File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line
201, in _greenback_shim
next_yield, resume_greenlet = resume_greenlet.switch(next_send)
File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line
81, in trampoline
next_yield: Any = next_send.send(orig_coro) # type: ignore
File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line
185, in send
return gen.send(self.value)
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py",
line 1106, in run_trigger
async for event in trigger.run():
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py",
line 90, in run
run_state = await self.hook.a_get_run_state(self.run_id)
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py",
line 524, in a_get_run_state
response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
line 713, in _a_do_api_call
url = self._endpoint_url(full_endpoint)
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
line 623, in _endpoint_url
port = f":{self.databricks_conn.port}" if self.databricks_conn.port else
""
File "/usr/python/lib/python3.10/functools.py", line 981, in __get__
val = self.func(instance)
File
"/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py",
line 142, in databricks_conn
return self.get_connection(self.databricks_conn_id) # type:
ignore[return-value]
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61, in
get_connection
conn = Connection.get(conn_id)
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py",
line 226, in get
return _get_connection(conn_id)
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py",
line 172, in _get_connection
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id
`databricks_default` isn't defined
```
### What you think should happen instead?
_No response_
### How to reproduce
1. Create databricks_default connection from UI.
2. Execute below DAG
```
import json
import os
from datetime import timedelta
from typing import Dict, Optional
from airflow.models.dag import DAG
from airflow.utils.timezone import datetime
from airflow.providers.databricks.operators.databricks import
DatabricksSubmitRunOperator, DatabricksRunNowOperator
DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID",
"databricks_default")
# Notebook path as a Json object
notebook_task = '{"notebook_path": "/Shared/Notebook_1"}'
NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK",
notebook_task))
notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
"retry_delay":
timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
new_cluster = {
"num_workers": 1,
"spark_version": "10.4.x-scala2.12",
"spark_conf": {},
"azure_attributes": {
"availability": "ON_DEMAND_AZURE",
"spot_bid_max_price": -1,
},
"node_type_id": "Standard_D3_v2",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
"cluster_source": "JOB",
"init_scripts": [],
}
with DAG(
dag_id="example_async_databricks",
start_date=datetime(2022, 1, 1),
schedule=None,
catchup=False,
default_args=default_args,
tags=["example", "async", "databricks"],
) as dag:
# [START howto_operator_databricks_submit_run_async]
opr_submit_run = DatabricksSubmitRunOperator(
task_id="submit_run",
databricks_conn_id=DATABRICKS_CONN_ID,
new_cluster=new_cluster,
notebook_task=NOTEBOOK_TASK,
do_xcom_push=True,
deferrable=True,
)
# [END howto_operator_databricks_submit_run_async]
# [START howto_operator_databricks_run_now_async]
opr_run_now = DatabricksRunNowOperator(
task_id="run_now",
databricks_conn_id=DATABRICKS_CONN_ID,
job_id="{{ task_instance.xcom_pull(task_ids='submit_run',
dag_id='example_async_databricks', key='job_id') }}",
notebook_params=notebook_params,
deferrable=True
)
# [END howto_operator_databricks_run_now_async]
opr_submit_run >> opr_run_now
```
### Operating System
Linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]