This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cec2a78ac8 Fix triggerer errors after Airflow 2 to 3 migration 
(#55884)
7cec2a78ac8 is described below

commit 7cec2a78ac8f511a1da94048d4dba8863751ec87
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Oct 22 23:23:01 2025 +0100

    Fix triggerer errors after Airflow 2 to 3 migration (#55884)
    
    * Fix triggerer errors after Airflow 2 to 3 migration
    
    When upgrading from Airflow 2, existing deferred triggers can reference
    TaskInstances without a dag_version_id and DagRuns with conf=None. This
    caused errors when the triggerer tried to start those triggers and when
    workers consumed ti_run responses.
    
    This change:
    1. Skips starting triggers whose TaskInstance lacks dag_version_id, logging
    a warning instead of erroring
    2. Coerces DagRun.conf from None to {} in the ti_run response for
    compatibility with Airflow 2-era data
    3. Adds unit tests covering both behaviors
    
    This prevents triggerer crashes and makes deferred tasks resume reliably
    after migration.
    
    * Remove config check as that has been addressed in a different PR
    
    * Add comment on why we added this
    
    * Remove null conf test
---
 .../src/airflow/jobs/triggerer_job_runner.py       |  8 +++++-
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 32 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index f5bc2d94d06..617643c3f24 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -649,7 +649,13 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
             )
             if new_trigger_orm.task_instance:
                 log_path = render_log_fname(ti=new_trigger_orm.task_instance)
-
+                if not new_trigger_orm.task_instance.dag_version_id:
+                    # This is to handle 2 to 3 upgrade where TI.dag_version_id 
can be none
+                    log.warning(
+                        "TaskInstance associated with Trigger has no 
associated Dag Version, skipping the trigger",
+                        ti_id=new_trigger_orm.task_instance.id,
+                    )
+                    continue
                 ser_ti = workloads.TaskInstance.model_validate(
                     new_trigger_orm.task_instance, from_attributes=True
                 )
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 1e6eb1b9d6f..14302e36a83 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1140,6 +1140,38 @@ def 
test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple
     assert trigger_orm2.id in trigger_ids
 
 
+def test_update_triggers_skips_when_ti_has_no_dag_version(session, 
supervisor_builder, dag_maker):
+    """
+    Ensure supervisor skips creating a trigger when the linked TaskInstance 
has no dag_version_id.
+    """
+    with dag_maker(dag_id="test_no_dag_version"):
+        EmptyOperator(task_id="t1")
+    dr = dag_maker.create_dagrun()
+    ti = dr.task_instances[0]
+
+    # Create a Trigger and link it to the TaskInstance
+    trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
+    trigger_orm = Trigger.from_object(trigger)
+    session.add(trigger_orm)
+    session.flush()
+
+    ti.trigger_id = trigger_orm.id
+    # Explicitly remove dag_version_id
+    ti.dag_version_id = None
+    session.merge(ti)
+    session.commit()
+
+    supervisor = supervisor_builder()
+
+    # Attempt to enqueue creation of this trigger
+    supervisor.update_triggers({trigger_orm.id})
+
+    # Assert that nothing was queued for creation and no subprocess writes 
happened
+    assert len(supervisor.creating_triggers) == 0
+    assert trigger_orm.id not in supervisor.running_triggers
+    supervisor.stdin.write.assert_not_called()
+
+
 class TestTriggererMessageTypes:
     def test_message_types_in_triggerer(self):
         """

Reply via email to