This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d2efe66d2b Handle var/conn not-found cases better when defined at
top-level (#47863)
0d2efe66d2b is described below
commit 0d2efe66d2bdd70fadc6edb54be372235bbd778a
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Mar 17 22:16:59 2025 +0530
Handle var/conn not-found cases better when defined at top-level (#47863)
closes: https://github.com/apache/airflow/issues/47862
When a var/conn is defined at top level in a dag and it does't exist, the
error is pretty trash. Fixing that to handle that case better.
Used the same as in the linked issue, but error is better now:
```
Traceback (most recent call last):
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/variable.py",
line 50, in get
return _get_variable(key, deserialize_json=deserialize_json).value
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py",
line 108, in _get_variable
raise AirflowRuntimeError(msg)
airflow.sdk.exceptions.AirflowRuntimeError: VARIABLE_NOT_FOUND: {'key':
'hi'}
```
---
airflow/dag_processing/processor.py | 10 +++++--
tests/dag_processing/test_processor.py | 51 ++++++++++++++++++++++++++++++++++
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 889f9b232ce..088843f463e 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -33,7 +33,13 @@ from airflow.callbacks.callback_requests import (
)
from airflow.configuration import conf
from airflow.models.dagbag import DagBag
-from airflow.sdk.execution_time.comms import ConnectionResult, GetConnection,
GetVariable, VariableResult
+from airflow.sdk.execution_time.comms import (
+ ConnectionResult,
+ ErrorResponse,
+ GetConnection,
+ GetVariable,
+ VariableResult,
+)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess
from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
from airflow.stats import Stats
@@ -52,7 +58,7 @@ ToManager = Annotated[
]
ToDagProcessor = Annotated[
- Union["DagFileParseRequest", ConnectionResult, VariableResult],
+ Union["DagFileParseRequest", ConnectionResult, VariableResult,
ErrorResponse],
Field(discriminator="type"),
]
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index e920f1c3311..ca9670f81ae 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -156,6 +156,31 @@ class TestDagFileProcessor:
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "test_abc"
+ def test_top_level_variable_access_not_found(
+ self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch:
pytest.MonkeyPatch
+ ):
+ logger_filehandle = MagicMock()
+
+ def dag_in_a_fn():
+ from airflow.sdk import DAG, Variable
+
+ with DAG(f"test_{Variable.get('myvar')}"):
+ ...
+
+ path = write_dag_in_a_fn_to_file(dag_in_a_fn, tmp_path)
+ proc = DagFileProcessorProcess.start(
+ id=1, path=path, bundle_path=tmp_path, callbacks=[],
logger_filehandle=logger_filehandle
+ )
+
+ while not proc.is_ready:
+ proc._service_subprocess(0.1)
+
+ result = proc.parsing_result
+ assert result is not None
+ assert result.import_errors != {}
+ if result.import_errors:
+ assert "VARIABLE_NOT_FOUND" in
next(iter(result.import_errors.values()))
+
def test_top_level_connection_access(self, tmp_path: pathlib.Path,
monkeypatch: pytest.MonkeyPatch):
logger_filehandle = MagicMock()
@@ -181,6 +206,32 @@ class TestDagFileProcessor:
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "test_my_conn"
+ def test_top_level_connection_access_not_found(
+ self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
+ ):
+ logger_filehandle = MagicMock()
+
+ def dag_in_a_fn():
+ from airflow.hooks.base import BaseHook
+ from airflow.sdk import DAG
+
+ with
DAG(f"test_{BaseHook.get_connection(conn_id='my_conn').conn_id}"):
+ ...
+
+ path = write_dag_in_a_fn_to_file(dag_in_a_fn, tmp_path)
+ proc = DagFileProcessorProcess.start(
+ id=1, path=path, bundle_path=tmp_path, callbacks=[],
logger_filehandle=logger_filehandle
+ )
+
+ while not proc.is_ready:
+ proc._service_subprocess(0.1)
+
+ result = proc.parsing_result
+ assert result is not None
+ assert result.import_errors != {}
+ if result.import_errors:
+ assert "CONNECTION_NOT_FOUND" in
next(iter(result.import_errors.values()))
+
def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) ->
pathlib.Path:
# Create the dag in a fn, and use inspect.getsource to write it to a file
so that