This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d2efe66d2b Handle var/conn not-found cases better when defined at 
top-level (#47863)
0d2efe66d2b is described below

commit 0d2efe66d2bdd70fadc6edb54be372235bbd778a
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Mar 17 22:16:59 2025 +0530

    Handle var/conn not-found cases better when defined at top-level (#47863)
    
    
    closes: https://github.com/apache/airflow/issues/47862
    
    
    When a var/conn is defined at top level in a dag and it does't exist, the 
error is pretty trash. Fixing that to handle that case better.
    
    Used the same as in the linked issue, but error is better now:
    ```
    Traceback (most recent call last):
      File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/variable.py", 
line 50, in get
        return _get_variable(key, deserialize_json=deserialize_json).value
      File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", 
line 108, in _get_variable
        raise AirflowRuntimeError(msg)
    airflow.sdk.exceptions.AirflowRuntimeError: VARIABLE_NOT_FOUND: {'key': 
'hi'}
    ```
---
 airflow/dag_processing/processor.py    | 10 +++++--
 tests/dag_processing/test_processor.py | 51 ++++++++++++++++++++++++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 889f9b232ce..088843f463e 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -33,7 +33,13 @@ from airflow.callbacks.callback_requests import (
 )
 from airflow.configuration import conf
 from airflow.models.dagbag import DagBag
-from airflow.sdk.execution_time.comms import ConnectionResult, GetConnection, 
GetVariable, VariableResult
+from airflow.sdk.execution_time.comms import (
+    ConnectionResult,
+    ErrorResponse,
+    GetConnection,
+    GetVariable,
+    VariableResult,
+)
 from airflow.sdk.execution_time.supervisor import WatchedSubprocess
 from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
 from airflow.stats import Stats
@@ -52,7 +58,7 @@ ToManager = Annotated[
 ]
 
 ToDagProcessor = Annotated[
-    Union["DagFileParseRequest", ConnectionResult, VariableResult],
+    Union["DagFileParseRequest", ConnectionResult, VariableResult, 
ErrorResponse],
     Field(discriminator="type"),
 ]
 
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index e920f1c3311..ca9670f81ae 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -156,6 +156,31 @@ class TestDagFileProcessor:
         assert result.import_errors == {}
         assert result.serialized_dags[0].dag_id == "test_abc"
 
+    def test_top_level_variable_access_not_found(
+        self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: 
pytest.MonkeyPatch
+    ):
+        logger_filehandle = MagicMock()
+
+        def dag_in_a_fn():
+            from airflow.sdk import DAG, Variable
+
+            with DAG(f"test_{Variable.get('myvar')}"):
+                ...
+
+        path = write_dag_in_a_fn_to_file(dag_in_a_fn, tmp_path)
+        proc = DagFileProcessorProcess.start(
+            id=1, path=path, bundle_path=tmp_path, callbacks=[], 
logger_filehandle=logger_filehandle
+        )
+
+        while not proc.is_ready:
+            proc._service_subprocess(0.1)
+
+        result = proc.parsing_result
+        assert result is not None
+        assert result.import_errors != {}
+        if result.import_errors:
+            assert "VARIABLE_NOT_FOUND" in 
next(iter(result.import_errors.values()))
+
     def test_top_level_connection_access(self, tmp_path: pathlib.Path, 
monkeypatch: pytest.MonkeyPatch):
         logger_filehandle = MagicMock()
 
@@ -181,6 +206,32 @@ class TestDagFileProcessor:
         assert result.import_errors == {}
         assert result.serialized_dags[0].dag_id == "test_my_conn"
 
+    def test_top_level_connection_access_not_found(
+        self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
+    ):
+        logger_filehandle = MagicMock()
+
+        def dag_in_a_fn():
+            from airflow.hooks.base import BaseHook
+            from airflow.sdk import DAG
+
+            with 
DAG(f"test_{BaseHook.get_connection(conn_id='my_conn').conn_id}"):
+                ...
+
+        path = write_dag_in_a_fn_to_file(dag_in_a_fn, tmp_path)
+        proc = DagFileProcessorProcess.start(
+            id=1, path=path, bundle_path=tmp_path, callbacks=[], 
logger_filehandle=logger_filehandle
+        )
+
+        while not proc.is_ready:
+            proc._service_subprocess(0.1)
+
+        result = proc.parsing_result
+        assert result is not None
+        assert result.import_errors != {}
+        if result.import_errors:
+            assert "CONNECTION_NOT_FOUND" in 
next(iter(result.import_errors.values()))
+
 
 def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) -> 
pathlib.Path:
     # Create the dag in a fn, and use inspect.getsource to write it to a file 
so that

Reply via email to