This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e2dc0a9be1 Refactor unneeded 'continue' jumps in jobs (#33846)
e2dc0a9be1 is described below

commit e2dc0a9be1dd48c0841c2f07cb5870920d397c95
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Mon Sep 4 09:38:55 2023 +0000

    Refactor unneeded 'continue' jumps in jobs (#33846)
---
 airflow/jobs/backfill_job_runner.py  | 14 +++++++-------
 airflow/jobs/scheduler_job_runner.py | 23 ++++++++++-------------
 2 files changed, 17 insertions(+), 20 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index d93d3594ca..a1fd7a9173 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -656,8 +656,6 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
                             _per_task_process(key, ti, session)
                             try:
                                 session.commit()
-                                # break the retry loop
-                                break
                             except OperationalError:
                                 self.log.error(
                                     "Failed to commit task state due to 
operational error. "
@@ -669,6 +667,9 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
                                 if i == max_attempts - 1:
                                     raise
                                 # retry the loop
+                            else:
+                                # break the retry loop
+                                break
             except (NoAvailablePoolSlot, DagConcurrencyLimitReached, 
TaskConcurrencyLimitReached) as e:
                 self.log.debug(e)
 
@@ -815,11 +816,10 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
         for dagrun_info in dagrun_infos:
             for dag in self._get_dag_with_subdags():
                 dag_run = self._get_dag_run(dagrun_info, dag, session=session)
-                if dag_run is None:
-                    continue
-                tis_map = self._task_instances_for_dag_run(dag, dag_run, 
session=session)
-                ti_status.active_runs.append(dag_run)
-                ti_status.to_run.update(tis_map or {})
+                if dag_run is not None:
+                    tis_map = self._task_instances_for_dag_run(dag, dag_run, 
session=session)
+                    ti_status.active_runs.append(dag_run)
+                    ti_status.to_run.update(tis_map or {})
 
         processed_dag_run_dates = self._process_backfill_task_instances(
             ti_status=ti_status,
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 52f6fb737a..1507a6f06f 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -897,13 +897,11 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
             )
             for dag_run in paused_runs:
                 dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
-                if dag is None:
-                    continue
-
-                dag_run.dag = dag
-                _, callback_to_run = 
dag_run.update_state(execute_callbacks=False, session=session)
-                if callback_to_run:
-                    self._send_dag_callbacks_to_processor(dag, callback_to_run)
+                if dag is not None:
+                    dag_run.dag = dag
+                    _, callback_to_run = 
dag_run.update_state(execute_callbacks=False, session=session)
+                    if callback_to_run:
+                        self._send_dag_callbacks_to_processor(dag, 
callback_to_run)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
@@ -1073,13 +1071,12 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
         )
         for dag_run, callback_to_run in callback_tuples:
             dag = cached_get_dag(dag_run.dag_id)
-
-            if not dag:
+            if dag:
+                # Sending callbacks there as in standalone_dag_processor they 
are adding to the database,
+                # so it must be done outside of prohibit_commit.
+                self._send_dag_callbacks_to_processor(dag, callback_to_run)
+            else:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_run.dag_id)
-                continue
-            # Sending callbacks there as in standalone_dag_processor they are 
adding to the database,
-            # so it must be done outside of prohibit_commit.
-            self._send_dag_callbacks_to_processor(dag, callback_to_run)
 
         with prohibit_commit(session) as guard:
             # Without this, the session has an invalid view of the DB

Reply via email to