This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ecbc95981213ce8178af249a891e2b3065e3f028
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Nov 27 11:07:10 2023 -0800

    Use ExitStack to manage mutation of secrets_backend_list in dag.test 
(#34620)
    
    Although it requires another indent, it's cleaner, and more importantly it 
makes sure that the mutation is undone after failure.
    
    (cherry picked from commit 99b4eb769d2a3b6692de9c0d83ba64041abf5789)
---
 airflow/models/dag.py | 103 +++++++++++++++++++++++++-------------------------
 1 file changed, 52 insertions(+), 51 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 27e8258a6d..5daa7bb805 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -32,6 +32,7 @@ import traceback
 import warnings
 import weakref
 from collections import deque
+from contextlib import ExitStack
 from datetime import datetime, timedelta
 from inspect import signature
 from typing import (
@@ -2797,63 +2798,63 @@ class DAG(LoggingMixin):
                 self.log.debug("Adding Streamhandler to taskinstance %s", 
ti.task_id)
                 ti.log.addHandler(handler)
 
+        exit_stack = ExitStack()
         if conn_file_path or variable_file_path:
             local_secrets = LocalFilesystemBackend(
                 variables_file_path=variable_file_path, 
connections_file_path=conn_file_path
             )
             secrets_backend_list.insert(0, local_secrets)
+            exit_stack.callback(lambda: secrets_backend_list.pop(0))
+
+        with exit_stack:
+            execution_date = execution_date or timezone.utcnow()
+            self.validate()
+            self.log.debug("Clearing existing task instances for execution 
date %s", execution_date)
+            self.clear(
+                start_date=execution_date,
+                end_date=execution_date,
+                dag_run_state=False,  # type: ignore
+                session=session,
+            )
+            self.log.debug("Getting dagrun for dag %s", self.dag_id)
+            logical_date = timezone.coerce_datetime(execution_date)
+            data_interval = 
self.timetable.infer_manual_data_interval(run_after=logical_date)
+            dr: DagRun = _get_or_create_dagrun(
+                dag=self,
+                start_date=execution_date,
+                execution_date=execution_date,
+                run_id=DagRun.generate_run_id(DagRunType.MANUAL, 
execution_date),
+                session=session,
+                conf=run_conf,
+                data_interval=data_interval,
+            )
 
-        execution_date = execution_date or timezone.utcnow()
-        self.validate()
-        self.log.debug("Clearing existing task instances for execution date 
%s", execution_date)
-        self.clear(
-            start_date=execution_date,
-            end_date=execution_date,
-            dag_run_state=False,  # type: ignore
-            session=session,
-        )
-        self.log.debug("Getting dagrun for dag %s", self.dag_id)
-        logical_date = timezone.coerce_datetime(execution_date)
-        data_interval = 
self.timetable.infer_manual_data_interval(run_after=logical_date)
-        dr: DagRun = _get_or_create_dagrun(
-            dag=self,
-            start_date=execution_date,
-            execution_date=execution_date,
-            run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
-            session=session,
-            conf=run_conf,
-            data_interval=data_interval,
-        )
-
-        tasks = self.task_dict
-        self.log.debug("starting dagrun")
-        # Instead of starting a scheduler, we run the minimal loop possible to 
check
-        # for task readiness and dependency management. This is notably faster
-        # than creating a BackfillJob and allows us to surface logs to the user
-        while dr.state == DagRunState.RUNNING:
-            session.expire_all()
-            schedulable_tis, _ = dr.update_state(session=session)
-            for s in schedulable_tis:
-                s.state = TaskInstanceState.SCHEDULED
-            session.commit()
-            # triggerer may mark tasks scheduled so we read from DB
-            all_tis = set(dr.get_task_instances(session=session))
-            scheduled_tis = {x for x in all_tis if x.state == 
TaskInstanceState.SCHEDULED}
-            ids_unrunnable = {x for x in all_tis if x.state not in 
State.finished} - scheduled_tis
-            if not scheduled_tis and ids_unrunnable:
-                self.log.warning("No tasks to run. unrunnable tasks: %s", 
ids_unrunnable)
-                time.sleep(1)
-            triggerer_running = _triggerer_is_healthy()
-            for ti in scheduled_tis:
-                try:
-                    add_logger_if_needed(ti)
-                    ti.task = tasks[ti.task_id]
-                    _run_task(ti=ti, inline_trigger=not triggerer_running, 
session=session)
-                except Exception:
-                    self.log.exception("Task failed; ti=%s", ti)
-        if conn_file_path or variable_file_path:
-            # Remove the local variables we have added to the 
secrets_backend_list
-            secrets_backend_list.pop(0)
+            tasks = self.task_dict
+            self.log.debug("starting dagrun")
+            # Instead of starting a scheduler, we run the minimal loop 
possible to check
+            # for task readiness and dependency management. This is notably 
faster
+            # than creating a BackfillJob and allows us to surface logs to the 
user
+            while dr.state == DagRunState.RUNNING:
+                session.expire_all()
+                schedulable_tis, _ = dr.update_state(session=session)
+                for s in schedulable_tis:
+                    s.state = TaskInstanceState.SCHEDULED
+                session.commit()
+                # triggerer may mark tasks scheduled so we read from DB
+                all_tis = set(dr.get_task_instances(session=session))
+                scheduled_tis = {x for x in all_tis if x.state == 
TaskInstanceState.SCHEDULED}
+                ids_unrunnable = {x for x in all_tis if x.state not in 
State.finished} - scheduled_tis
+                if not scheduled_tis and ids_unrunnable:
+                    self.log.warning("No tasks to run. unrunnable tasks: %s", 
ids_unrunnable)
+                    time.sleep(1)
+                triggerer_running = _triggerer_is_healthy()
+                for ti in scheduled_tis:
+                    try:
+                        add_logger_if_needed(ti)
+                        ti.task = tasks[ti.task_id]
+                        _run_task(ti=ti, inline_trigger=not triggerer_running, 
session=session)
+                    except Exception:
+                        self.log.exception("Task failed; ti=%s", ti)
         return dr
 
     @provide_session

Reply via email to