This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f9075512c59b57c18f3af0de885f75b26ac66150
Author: Joseph Min <[email protected]>
AuthorDate: Mon Jun 3 01:54:58 2024 -0700

    Fix triggerer race condition in HA setting (#38666)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    (cherry picked from commit da3a77a3d5b8a8a80e7d46c43dcd656e401b7fea)
---
 airflow/jobs/triggerer_job_runner.py | 14 +++++++
 tests/jobs/test_triggerer_job.py     | 80 ++++++++++++++++++++++++++++++++++++
 2 files changed, 94 insertions(+)

diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index bb151b32cc..249edc4760 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -672,6 +672,20 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.failed_triggers.append((new_id, e))
                 continue
 
+            # If new_trigger_orm.task_instance is None, this means the 
TaskInstance
+            # row was updated by either Trigger.submit_event or 
Trigger.submit_failure
+            # and can happen when a single trigger Job is being run on 
multiple TriggerRunners
+            # in a High-Availability setup.
+            if new_trigger_orm.task_instance is None:
+                self.log.info(
+                    (
+                        "TaskInstance for Trigger ID %s is None. It was likely 
updated by another trigger job. "
+                        "Skipping trigger instantiation."
+                    ),
+                    new_id,
+                )
+                continue
+
             try:
                 new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
             except TypeError as err:
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 9279c97d53..cba68a5d31 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -309,6 +309,86 @@ class TestTriggerRunner:
         assert "got an unexpected keyword argument 'not_exists_arg'" in 
caplog.text
 
 
[email protected]
+async def test_trigger_create_race_condition_38599(session, tmp_path):
+    """
+    This verifies the resolution of race condition documented in github issue 
#38599.
+    More details in the issue description.
+
+    The race condition may occur in the following scenario:
+        1. TaskInstance TI1 defers itself, which creates Trigger T1, which 
holds a
+            reference to TI1.
+        2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1.
+        3. TJR1 misses a heartbeat, most likely due to high host load causing 
delays in
+            each TriggererJobRunner._run_trigger_loop loop.
+        4. A second TriggererJobRunner TJR2 notices that T1 has missed its 
heartbeat,
+            so it starts the process of picking up any Triggers that TJR1 may 
have had,
+            including T1.
+        5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and 
cleans it
+            up by clearing the trigger_id of TI1.
+        6. TJR2 tries to execute T1, but it crashes (with the above error) 
while trying to
+            look up TI1 (because T1 no longer has a TaskInstance linked to it).
+    """
+    path = tmp_path / "test_trigger_create_after_completion.txt"
+    trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1), 
filename=path.as_posix())
+    trigger_orm = Trigger.from_object(trigger)
+    trigger_orm.id = 1
+    session.add(trigger_orm)
+
+    dag = DagModel(dag_id="test-dag")
+    dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none")
+    ti = TaskInstance(
+        PythonOperator(task_id="dummy-task", python_callable=print),
+        run_id=dag_run.run_id,
+        state=TaskInstanceState.DEFERRED,
+    )
+    ti.dag_id = dag.dag_id
+    ti.trigger_id = 1
+    session.add(dag)
+    session.add(dag_run)
+    session.add(ti)
+
+    job1 = Job()
+    job2 = Job()
+    session.add(job1)
+    session.add(job2)
+
+    session.commit()
+
+    job_runner1 = TriggererJobRunner(job1)
+    job_runner2 = TriggererJobRunner(job2)
+
+    # Assign and run the trigger on the first TriggererJobRunner
+    # Instead of running job_runner1._execute, we will run the individual 
methods
+    # to control the timing of the execution.
+    job_runner1.load_triggers()
+    assert len(job_runner1.trigger_runner.to_create) == 1
+    # Before calling job_runner1.handle_events, run the trigger synchronously
+    await job_runner1.trigger_runner.create_triggers()
+    assert len(job_runner1.trigger_runner.triggers) == 1
+    _, trigger_task_info = 
next(iter(job_runner1.trigger_runner.triggers.items()))
+    await trigger_task_info["task"]
+    assert trigger_task_info["task"].done()
+
+    # In a real execution environment, a missed heartbeat would cause the 
trigger to be picked up
+    # by another TriggererJobRunner.
+    # In this test, however, this is not necessary because we are controlling 
the execution
+    # of the TriggererJobRunner.
+    # job1.latest_heartbeat = timezone.utcnow() - datetime.timedelta(hours=1)
+    # session.commit()
+
+    # This calls Trigger.submit_event, which will unlink the trigger from the 
task instance
+    job_runner1.handle_events()
+
+    # Simulate the second TriggererJobRunner picking up the trigger
+    job_runner2.trigger_runner.update_triggers({trigger_orm.id})
+    # The race condition happens here.
+    # AttributeError: 'NoneType' object has no attribute 'dag_id'
+    await job_runner2.trigger_runner.create_triggers()
+
+    assert path.read_text() == "hi\n"
+
+
 def test_trigger_create_race_condition_18392(session, tmp_path):
     """
     This verifies the resolution of race condition documented in github issue 
#18392.

Reply via email to