This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new da3a77a3d5 Fix triggerer race condition in HA setting (#38666)
da3a77a3d5 is described below
commit da3a77a3d5b8a8a80e7d46c43dcd656e401b7fea
Author: Joseph Min <[email protected]>
AuthorDate: Mon Jun 3 01:54:58 2024 -0700
Fix triggerer race condition in HA setting (#38666)
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/jobs/triggerer_job_runner.py | 14 +++++++
tests/jobs/test_triggerer_job.py | 80 ++++++++++++++++++++++++++++++++++++
2 files changed, 94 insertions(+)
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 8237e1e493..e1736ae8e5 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -678,6 +678,20 @@ class TriggerRunner(threading.Thread, LoggingMixin):
self.failed_triggers.append((new_id, e))
continue
+ # If new_trigger_orm.task_instance is None, this means the
TaskInstance
+ # row was updated by either Trigger.submit_event or
Trigger.submit_failure
+ # and can happen when a single trigger Job is being run on
multiple TriggerRunners
+ # in a High-Availability setup.
+ if new_trigger_orm.task_instance is None:
+ self.log.info(
+ (
+ "TaskInstance for Trigger ID %s is None. It was likely
updated by another trigger job. "
+ "Skipping trigger instantiation."
+ ),
+ new_id,
+ )
+ continue
+
try:
new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
except TypeError as err:
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index df9699806f..10d4196ac9 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -309,6 +309,86 @@ class TestTriggerRunner:
assert "got an unexpected keyword argument 'not_exists_arg'" in
caplog.text
[email protected]
+async def test_trigger_create_race_condition_38599(session, tmp_path):
+ """
+ This verifies the resolution of race condition documented in github issue
#38599.
+ More details in the issue description.
+
+ The race condition may occur in the following scenario:
+ 1. TaskInstance TI1 defers itself, which creates Trigger T1, which
holds a
+ reference to TI1.
+ 2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1.
+ 3. TJR1 misses a heartbeat, most likely due to high host load causing
delays in
+ each TriggererJobRunner._run_trigger_loop loop.
+ 4. A second TriggererJobRunner TJR2 notices that T1 has missed its
heartbeat,
+ so it starts the process of picking up any Triggers that TJR1 may
have had,
+ including T1.
+ 5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and
cleans it
+ up by clearing the trigger_id of TI1.
+ 6. TJR2 tries to execute T1, but it crashes (with the above error)
while trying to
+ look up TI1 (because T1 no longer has a TaskInstance linked to it).
+ """
+ path = tmp_path / "test_trigger_create_after_completion.txt"
+ trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1),
filename=path.as_posix())
+ trigger_orm = Trigger.from_object(trigger)
+ trigger_orm.id = 1
+ session.add(trigger_orm)
+
+ dag = DagModel(dag_id="test-dag")
+ dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none")
+ ti = TaskInstance(
+ PythonOperator(task_id="dummy-task", python_callable=print),
+ run_id=dag_run.run_id,
+ state=TaskInstanceState.DEFERRED,
+ )
+ ti.dag_id = dag.dag_id
+ ti.trigger_id = 1
+ session.add(dag)
+ session.add(dag_run)
+ session.add(ti)
+
+ job1 = Job()
+ job2 = Job()
+ session.add(job1)
+ session.add(job2)
+
+ session.commit()
+
+ job_runner1 = TriggererJobRunner(job1)
+ job_runner2 = TriggererJobRunner(job2)
+
+ # Assign and run the trigger on the first TriggererJobRunner
+ # Instead of running job_runner1._execute, we will run the individual
methods
+ # to control the timing of the execution.
+ job_runner1.load_triggers()
+ assert len(job_runner1.trigger_runner.to_create) == 1
+ # Before calling job_runner1.handle_events, run the trigger synchronously
+ await job_runner1.trigger_runner.create_triggers()
+ assert len(job_runner1.trigger_runner.triggers) == 1
+ _, trigger_task_info =
next(iter(job_runner1.trigger_runner.triggers.items()))
+ await trigger_task_info["task"]
+ assert trigger_task_info["task"].done()
+
+ # In a real execution environment, a missed heartbeat would cause the
trigger to be picked up
+ # by another TriggererJobRunner.
+ # In this test, however, this is not necessary because we are controlling
the execution
+ # of the TriggererJobRunner.
+ # job1.latest_heartbeat = timezone.utcnow() - datetime.timedelta(hours=1)
+ # session.commit()
+
+ # This calls Trigger.submit_event, which will unlink the trigger from the
task instance
+ job_runner1.handle_events()
+
+ # Simulate the second TriggererJobRunner picking up the trigger
+ job_runner2.trigger_runner.update_triggers({trigger_orm.id})
+ # The race condition happens here.
+ # AttributeError: 'NoneType' object has no attribute 'dag_id'
+ await job_runner2.trigger_runner.create_triggers()
+
+ assert path.read_text() == "hi\n"
+
+
def test_trigger_create_race_condition_18392(session, tmp_path):
"""
This verifies the resolution of race condition documented in github issue
#18392.