This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a75c221367 Fix task_instance_mutation_hook receiving run_id=None 
during TaskInstance creation (#63049)
3a75c221367 is described below

commit 3a75c22136749a26dd44bad29e79c2f51c6d9660
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Mar 7 21:56:27 2026 +0100

    Fix task_instance_mutation_hook receiving run_id=None during TaskInstance 
creation (#63049)
    
    Also Add regression test verifying the hook sees the correct run_id
    
    In airflow/models/taskinstance.py, TI.__init__() calls
    self.refresh_from_task(task) which invokes
    task_instance_mutation_hook(task_instance) via _refresh_from_task(). 
However,
    self.run_id is not assigned until after this call, so any mutation hook that
    depends on run_id sees None.
    
    ```python
    self.refresh_from_task(task)  # calls hook — self.run_id is None here
    ...
    self.run_id = run_id          # too late
    ```
    
    This means a hook like the one in the issue:
    
    ```python
    def task_instance_mutation_hook(task_instance):
        if task_instance.run_id.startswith("manual__"):
            task_instance.pool = "manual_pool"
    ```
    
    fails with AttributeError: 'NoneType' object has no attribute 'startswith' 
(or
    silently does nothing if guarded with getattr).
    
    Move self.run_id = run_id before self.refresh_from_task(task) in 
TI.__init__():
    
    self.run_id = run_id          # set BEFORE refresh_from_task
    self.map_index = map_index
    self.refresh_from_task(task)  # hook now sees correct run_id
    
    Co-authored-by: Dev-iL <[email protected]>
    Co-authored-by: Elad Kalif <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py |  4 ++--
 airflow-core/tests/unit/models/test_dagrun.py   | 28 +++++++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 8b12db483af..8112d955582 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -566,6 +566,8 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
         self.dag_id = task.dag_id
         self.task_id = task.task_id
         self.map_index = map_index
+        if run_id is not None:
+            self.run_id = run_id
 
         self.refresh_from_task(task)
         if TYPE_CHECKING:
@@ -573,8 +575,6 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
         # init_on_load will config the log
         self.init_on_load()
 
-        if run_id is not None:
-            self.run_id = run_id
         self.try_number = 0
         self.max_tries = self.task.retries
         if not self.id:
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index bac07ffe1f2..3e62554901c 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -51,6 +51,7 @@ from airflow.sdk.definitions.callback import AsyncCallback
 from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference
 from airflow.serialization.definitions.deadline import 
SerializedReferenceModels
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
+from airflow.settings import get_policy_plugin_manager
 from airflow.task.trigger_rule import TriggerRule
 from airflow.triggers.base import StartTriggerArgs
 from airflow.utils.span_status import SpanStatus
@@ -943,6 +944,33 @@ class TestDagRun:
         task = dagrun.get_task_instances()[0]
         assert task.queue == "queue1"
 
+    def test_task_instance_mutation_hook_has_run_id(self, dag_maker, session):
+        """Test that task_instance_mutation_hook receives a TI with run_id set 
(not None).
+
+        Regression test for https://github.com/apache/airflow/issues/61945
+        """
+        observed_run_ids = []
+
+        def mutate_task_instance(task_instance):
+            observed_run_ids.append(task_instance.run_id)
+            if task_instance.run_id and 
task_instance.run_id.startswith("manual__"):
+                task_instance.pool = "manual_pool"
+
+        with mock.patch.object(
+            get_policy_plugin_manager().hook, "task_instance_mutation_hook", 
autospec=True
+        ) as mock_hook:
+            mock_hook.side_effect = mutate_task_instance
+            with dag_maker(
+                dag_id="test_mutation_hook_run_id", 
schedule=datetime.timedelta(days=1), session=session
+            ) as dag:
+                EmptyOperator(task_id="mutated_task", owner="test")
+
+            self.create_dag_run(dag, session=session)
+        # The hook should have been called during TI creation with run_id set
+        assert any(rid is not None for rid in observed_run_ids), (
+            f"task_instance_mutation_hook was called with run_id=None. 
Observed run_ids: {observed_run_ids}"
+        )
+
     @pytest.mark.parametrize(
         ("prev_ti_state", "is_ti_schedulable"),
         [

Reply via email to