This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f9075512c59b57c18f3af0de885f75b26ac66150 Author: Joseph Min <[email protected]> AuthorDate: Mon Jun 3 01:54:58 2024 -0700 Fix triggerer race condition in HA setting (#38666) Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit da3a77a3d5b8a8a80e7d46c43dcd656e401b7fea) --- airflow/jobs/triggerer_job_runner.py | 14 +++++++ tests/jobs/test_triggerer_job.py | 80 ++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index bb151b32cc..249edc4760 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -672,6 +672,20 @@ class TriggerRunner(threading.Thread, LoggingMixin): self.failed_triggers.append((new_id, e)) continue + # If new_trigger_orm.task_instance is None, this means the TaskInstance + # row was updated by either Trigger.submit_event or Trigger.submit_failure + # and can happen when a single trigger Job is being run on multiple TriggerRunners + # in a High-Availability setup. + if new_trigger_orm.task_instance is None: + self.log.info( + ( + "TaskInstance for Trigger ID %s is None. It was likely updated by another trigger job. " + "Skipping trigger instantiation." + ), + new_id, + ) + continue + try: new_trigger_instance = trigger_class(**new_trigger_orm.kwargs) except TypeError as err: diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 9279c97d53..cba68a5d31 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -309,6 +309,86 @@ class TestTriggerRunner: assert "got an unexpected keyword argument 'not_exists_arg'" in caplog.text [email protected] +async def test_trigger_create_race_condition_38599(session, tmp_path): + """ + This verifies the resolution of race condition documented in github issue #38599. + More details in the issue description. + + The race condition may occur in the following scenario: + 1. TaskInstance TI1 defers itself, which creates Trigger T1, which holds a + reference to TI1. + 2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1. + 3. TJR1 misses a heartbeat, most likely due to high host load causing delays in + each TriggererJobRunner._run_trigger_loop loop. + 4. A second TriggererJobRunner TJR2 notices that T1 has missed its heartbeat, + so it starts the process of picking up any Triggers that TJR1 may have had, + including T1. + 5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and cleans it + up by clearing the trigger_id of TI1. + 6. TJR2 tries to execute T1, but it crashes (with the above error) while trying to + look up TI1 (because T1 no longer has a TaskInstance linked to it). + """ + path = tmp_path / "test_trigger_create_after_completion.txt" + trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1), filename=path.as_posix()) + trigger_orm = Trigger.from_object(trigger) + trigger_orm.id = 1 + session.add(trigger_orm) + + dag = DagModel(dag_id="test-dag") + dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none") + ti = TaskInstance( + PythonOperator(task_id="dummy-task", python_callable=print), + run_id=dag_run.run_id, + state=TaskInstanceState.DEFERRED, + ) + ti.dag_id = dag.dag_id + ti.trigger_id = 1 + session.add(dag) + session.add(dag_run) + session.add(ti) + + job1 = Job() + job2 = Job() + session.add(job1) + session.add(job2) + + session.commit() + + job_runner1 = TriggererJobRunner(job1) + job_runner2 = TriggererJobRunner(job2) + + # Assign and run the trigger on the first TriggererJobRunner + # Instead of running job_runner1._execute, we will run the individual methods + # to control the timing of the execution. + job_runner1.load_triggers() + assert len(job_runner1.trigger_runner.to_create) == 1 + # Before calling job_runner1.handle_events, run the trigger synchronously + await job_runner1.trigger_runner.create_triggers() + assert len(job_runner1.trigger_runner.triggers) == 1 + _, trigger_task_info = next(iter(job_runner1.trigger_runner.triggers.items())) + await trigger_task_info["task"] + assert trigger_task_info["task"].done() + + # In a real execution environment, a missed heartbeat would cause the trigger to be picked up + # by another TriggererJobRunner. + # In this test, however, this is not necessary because we are controlling the execution + # of the TriggererJobRunner. + # job1.latest_heartbeat = timezone.utcnow() - datetime.timedelta(hours=1) + # session.commit() + + # This calls Trigger.submit_event, which will unlink the trigger from the task instance + job_runner1.handle_events() + + # Simulate the second TriggererJobRunner picking up the trigger + job_runner2.trigger_runner.update_triggers({trigger_orm.id}) + # The race condition happens here. + # AttributeError: 'NoneType' object has no attribute 'dag_id' + await job_runner2.trigger_runner.create_triggers() + + assert path.read_text() == "hi\n" + + def test_trigger_create_race_condition_18392(session, tmp_path): """ This verifies the resolution of race condition documented in github issue #18392.
