This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 905baf9 Send SLA callback to processor when DagRun has completed
(#20683)
905baf9 is described below
commit 905baf9fa5402ccc062536915fd1911d812f625b
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Jan 9 23:22:22 2022 +0100
Send SLA callback to processor when DagRun has completed (#20683)
* Send SLA callback to processor when DagRun has completed
Currently, sla callbacks are sent every time a dagrun is examined. This
causes sla
callbacks to be run too often and cause processors to timeout at times.
Also deleted dags are not recreated when there are many slas.
This PR addresses this by sending SLA callbacks to processor when a dagrun
completes
---
airflow/jobs/scheduler_job.py | 5 +++--
tests/jobs/test_scheduler_job.py | 43 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index cc45f3b..95e10a0 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1071,6 +1071,7 @@ class SchedulerJob(BaseJob):
# Send SLA & DAG Success/Failure Callbacks to be executed
self._send_dag_callbacks_to_processor(dag, callback_to_execute)
+ self._send_sla_callbacks_to_processor(dag)
# Because we send the callback here, we need to return None
return callback
@@ -1086,7 +1087,8 @@ class SchedulerJob(BaseJob):
# Work out if we should allow creating a new DagRun now?
if self._should_update_dag_next_dagruns(dag, dag_model,
active_runs):
dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
-
+ # Send SLA Callbacks to be executed
+ self._send_sla_callbacks_to_processor(dag)
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
@@ -1115,7 +1117,6 @@ class SchedulerJob(BaseJob):
if not self.processor_agent:
raise ValueError("Processor agent is not started.")
- self._send_sla_callbacks_to_processor(dag)
if callback:
self.processor_agent.send_callback_to_execute(callback)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6d7e979..81cdf92 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2574,6 +2574,49 @@ class TestSchedulerJob:
full_filepath=dag.fileloc, dag_id=dag_id
)
+ def test_sla_sent_to_processor_when_dagrun_completes(self, dag_maker,
session):
+ """Test that SLA is sent to the processor when the dagrun completes"""
+ with dag_maker() as dag:
+ DummyOperator(task_id='task', sla=timedelta(hours=1))
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor(do_update=False)
+ self.scheduler_job.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
+ mock_sla_callback = mock.MagicMock()
+ self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
+ assert session.query(DagRun).count() == 0
+ dag_models = DagModel.dags_needing_dagruns(session).all()
+ self.scheduler_job._create_dag_runs(dag_models, session)
+ dr = session.query(DagRun).one()
+ dr.state = DagRunState.SUCCESS
+ ti = dr.get_task_instance('task', session)
+ ti.state = TaskInstanceState.SUCCESS
+ session.merge(ti)
+ session.merge(dr)
+ session.flush()
+ self.scheduler_job._schedule_dag_run(dr, session)
+ dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
+
self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
+
+ def test_sla_sent_to_processor_when_dagrun_timeout(self, dag_maker,
session):
+ """Test that SLA is sent to the processor when the dagrun timeout"""
+ with dag_maker(dagrun_timeout=datetime.timedelta(seconds=60)) as dag:
+ DummyOperator(task_id='task', sla=timedelta(hours=1))
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor(do_update=False)
+ self.scheduler_job.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
+ mock_sla_callback = mock.MagicMock()
+ self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
+ assert session.query(DagRun).count() == 0
+ dag_models = DagModel.dags_needing_dagruns(session).all()
+ self.scheduler_job._create_dag_runs(dag_models, session)
+ dr = session.query(DagRun).one()
+ dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
+ session.merge(dr)
+ session.flush()
+ self.scheduler_job._schedule_dag_run(dr, session)
+ dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
+
self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
+
def test_create_dag_runs(self, dag_maker):
"""
Test various invariants of _create_dag_runs.