This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ecbc95981213ce8178af249a891e2b3065e3f028 Author: Daniel Standish <[email protected]> AuthorDate: Mon Nov 27 11:07:10 2023 -0800 Use ExitStack to manage mutation of secrets_backend_list in dag.test (#34620) Although it requires another indent, it's cleaner, and more importantly it makes sure that the mutation is undone after failure. (cherry picked from commit 99b4eb769d2a3b6692de9c0d83ba64041abf5789) --- airflow/models/dag.py | 103 +++++++++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 51 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 27e8258a6d..5daa7bb805 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -32,6 +32,7 @@ import traceback import warnings import weakref from collections import deque +from contextlib import ExitStack from datetime import datetime, timedelta from inspect import signature from typing import ( @@ -2797,63 +2798,63 @@ class DAG(LoggingMixin): self.log.debug("Adding Streamhandler to taskinstance %s", ti.task_id) ti.log.addHandler(handler) + exit_stack = ExitStack() if conn_file_path or variable_file_path: local_secrets = LocalFilesystemBackend( variables_file_path=variable_file_path, connections_file_path=conn_file_path ) secrets_backend_list.insert(0, local_secrets) + exit_stack.callback(lambda: secrets_backend_list.pop(0)) + + with exit_stack: + execution_date = execution_date or timezone.utcnow() + self.validate() + self.log.debug("Clearing existing task instances for execution date %s", execution_date) + self.clear( + start_date=execution_date, + end_date=execution_date, + dag_run_state=False, # type: ignore + session=session, + ) + self.log.debug("Getting dagrun for dag %s", self.dag_id) + logical_date = timezone.coerce_datetime(execution_date) + data_interval = self.timetable.infer_manual_data_interval(run_after=logical_date) + dr: DagRun = _get_or_create_dagrun( + dag=self, + start_date=execution_date, + execution_date=execution_date, + run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date), + session=session, + conf=run_conf, + data_interval=data_interval, + ) - execution_date = execution_date or timezone.utcnow() - self.validate() - self.log.debug("Clearing existing task instances for execution date %s", execution_date) - self.clear( - start_date=execution_date, - end_date=execution_date, - dag_run_state=False, # type: ignore - session=session, - ) - self.log.debug("Getting dagrun for dag %s", self.dag_id) - logical_date = timezone.coerce_datetime(execution_date) - data_interval = self.timetable.infer_manual_data_interval(run_after=logical_date) - dr: DagRun = _get_or_create_dagrun( - dag=self, - start_date=execution_date, - execution_date=execution_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date), - session=session, - conf=run_conf, - data_interval=data_interval, - ) - - tasks = self.task_dict - self.log.debug("starting dagrun") - # Instead of starting a scheduler, we run the minimal loop possible to check - # for task readiness and dependency management. This is notably faster - # than creating a BackfillJob and allows us to surface logs to the user - while dr.state == DagRunState.RUNNING: - session.expire_all() - schedulable_tis, _ = dr.update_state(session=session) - for s in schedulable_tis: - s.state = TaskInstanceState.SCHEDULED - session.commit() - # triggerer may mark tasks scheduled so we read from DB - all_tis = set(dr.get_task_instances(session=session)) - scheduled_tis = {x for x in all_tis if x.state == TaskInstanceState.SCHEDULED} - ids_unrunnable = {x for x in all_tis if x.state not in State.finished} - scheduled_tis - if not scheduled_tis and ids_unrunnable: - self.log.warning("No tasks to run. unrunnable tasks: %s", ids_unrunnable) - time.sleep(1) - triggerer_running = _triggerer_is_healthy() - for ti in scheduled_tis: - try: - add_logger_if_needed(ti) - ti.task = tasks[ti.task_id] - _run_task(ti=ti, inline_trigger=not triggerer_running, session=session) - except Exception: - self.log.exception("Task failed; ti=%s", ti) - if conn_file_path or variable_file_path: - # Remove the local variables we have added to the secrets_backend_list - secrets_backend_list.pop(0) + tasks = self.task_dict + self.log.debug("starting dagrun") + # Instead of starting a scheduler, we run the minimal loop possible to check + # for task readiness and dependency management. This is notably faster + # than creating a BackfillJob and allows us to surface logs to the user + while dr.state == DagRunState.RUNNING: + session.expire_all() + schedulable_tis, _ = dr.update_state(session=session) + for s in schedulable_tis: + s.state = TaskInstanceState.SCHEDULED + session.commit() + # triggerer may mark tasks scheduled so we read from DB + all_tis = set(dr.get_task_instances(session=session)) + scheduled_tis = {x for x in all_tis if x.state == TaskInstanceState.SCHEDULED} + ids_unrunnable = {x for x in all_tis if x.state not in State.finished} - scheduled_tis + if not scheduled_tis and ids_unrunnable: + self.log.warning("No tasks to run. unrunnable tasks: %s", ids_unrunnable) + time.sleep(1) + triggerer_running = _triggerer_is_healthy() + for ti in scheduled_tis: + try: + add_logger_if_needed(ti) + ti.task = tasks[ti.task_id] + _run_task(ti=ti, inline_trigger=not triggerer_running, session=session) + except Exception: + self.log.exception("Task failed; ti=%s", ti) return dr @provide_session
