This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e2dc0a9be1 Refactor unneeded 'continue' jumps in jobs (#33846)
e2dc0a9be1 is described below
commit e2dc0a9be1dd48c0841c2f07cb5870920d397c95
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Mon Sep 4 09:38:55 2023 +0000
Refactor unneeded 'continue' jumps in jobs (#33846)
---
airflow/jobs/backfill_job_runner.py | 14 +++++++-------
airflow/jobs/scheduler_job_runner.py | 23 ++++++++++-------------
2 files changed, 17 insertions(+), 20 deletions(-)
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index d93d3594ca..a1fd7a9173 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -656,8 +656,6 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
_per_task_process(key, ti, session)
try:
session.commit()
- # break the retry loop
- break
except OperationalError:
self.log.error(
"Failed to commit task state due to
operational error. "
@@ -669,6 +667,9 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
if i == max_attempts - 1:
raise
# retry the loop
+ else:
+ # break the retry loop
+ break
except (NoAvailablePoolSlot, DagConcurrencyLimitReached,
TaskConcurrencyLimitReached) as e:
self.log.debug(e)
@@ -815,11 +816,10 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
for dagrun_info in dagrun_infos:
for dag in self._get_dag_with_subdags():
dag_run = self._get_dag_run(dagrun_info, dag, session=session)
- if dag_run is None:
- continue
- tis_map = self._task_instances_for_dag_run(dag, dag_run,
session=session)
- ti_status.active_runs.append(dag_run)
- ti_status.to_run.update(tis_map or {})
+ if dag_run is not None:
+ tis_map = self._task_instances_for_dag_run(dag, dag_run,
session=session)
+ ti_status.active_runs.append(dag_run)
+ ti_status.to_run.update(tis_map or {})
processed_dag_run_dates = self._process_backfill_task_instances(
ti_status=ti_status,
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 52f6fb737a..1507a6f06f 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -897,13 +897,11 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
)
for dag_run in paused_runs:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
- if dag is None:
- continue
-
- dag_run.dag = dag
- _, callback_to_run =
dag_run.update_state(execute_callbacks=False, session=session)
- if callback_to_run:
- self._send_dag_callbacks_to_processor(dag, callback_to_run)
+ if dag is not None:
+ dag_run.dag = dag
+ _, callback_to_run =
dag_run.update_state(execute_callbacks=False, session=session)
+ if callback_to_run:
+ self._send_dag_callbacks_to_processor(dag,
callback_to_run)
except Exception as e: # should not fail the scheduler
self.log.exception("Failed to update dag run state for paused dags
due to %s", e)
@@ -1073,13 +1071,12 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
)
for dag_run, callback_to_run in callback_tuples:
dag = cached_get_dag(dag_run.dag_id)
-
- if not dag:
+ if dag:
+ # Sending callbacks there as in standalone_dag_processor they
are adding to the database,
+ # so it must be done outside of prohibit_commit.
+ self._send_dag_callbacks_to_processor(dag, callback_to_run)
+ else:
self.log.error("DAG '%s' not found in serialized_dag table",
dag_run.dag_id)
- continue
- # Sending callbacks there as in standalone_dag_processor they are
adding to the database,
- # so it must be done outside of prohibit_commit.
- self._send_dag_callbacks_to_processor(dag, callback_to_run)
with prohibit_commit(session) as guard:
# Without this, the session has an invalid view of the DB