This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3a75c221367 Fix task_instance_mutation_hook receiving run_id=None
during TaskInstance creation (#63049)
3a75c221367 is described below
commit 3a75c22136749a26dd44bad29e79c2f51c6d9660
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Mar 7 21:56:27 2026 +0100
Fix task_instance_mutation_hook receiving run_id=None during TaskInstance
creation (#63049)
Also Add regression test verifying the hook sees the correct run_id
In airflow/models/taskinstance.py, TI.__init__() calls
self.refresh_from_task(task) which invokes
task_instance_mutation_hook(task_instance) via _refresh_from_task().
However,
self.run_id is not assigned until after this call, so any mutation hook that
depends on run_id sees None.
```python
self.refresh_from_task(task) # calls hook — self.run_id is None here
...
self.run_id = run_id # too late
```
This means a hook like the one in the issue:
```python
def task_instance_mutation_hook(task_instance):
if task_instance.run_id.startswith("manual__"):
task_instance.pool = "manual_pool"
```
fails with AttributeError: 'NoneType' object has no attribute 'startswith'
(or
silently does nothing if guarded with getattr).
Move self.run_id = run_id before self.refresh_from_task(task) in
TI.__init__():
self.run_id = run_id # set BEFORE refresh_from_task
self.map_index = map_index
self.refresh_from_task(task) # hook now sees correct run_id
Co-authored-by: Dev-iL <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 4 ++--
airflow-core/tests/unit/models/test_dagrun.py | 28 +++++++++++++++++++++++++
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 8b12db483af..8112d955582 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -566,6 +566,8 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
self.dag_id = task.dag_id
self.task_id = task.task_id
self.map_index = map_index
+ if run_id is not None:
+ self.run_id = run_id
self.refresh_from_task(task)
if TYPE_CHECKING:
@@ -573,8 +575,6 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
# init_on_load will config the log
self.init_on_load()
- if run_id is not None:
- self.run_id = run_id
self.try_number = 0
self.max_tries = self.task.retries
if not self.id:
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index bac07ffe1f2..3e62554901c 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -51,6 +51,7 @@ from airflow.sdk.definitions.callback import AsyncCallback
from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference
from airflow.serialization.definitions.deadline import
SerializedReferenceModels
from airflow.serialization.serialized_objects import LazyDeserializedDAG
+from airflow.settings import get_policy_plugin_manager
from airflow.task.trigger_rule import TriggerRule
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.span_status import SpanStatus
@@ -943,6 +944,33 @@ class TestDagRun:
task = dagrun.get_task_instances()[0]
assert task.queue == "queue1"
+ def test_task_instance_mutation_hook_has_run_id(self, dag_maker, session):
+ """Test that task_instance_mutation_hook receives a TI with run_id set
(not None).
+
+ Regression test for https://github.com/apache/airflow/issues/61945
+ """
+ observed_run_ids = []
+
+ def mutate_task_instance(task_instance):
+ observed_run_ids.append(task_instance.run_id)
+ if task_instance.run_id and
task_instance.run_id.startswith("manual__"):
+ task_instance.pool = "manual_pool"
+
+ with mock.patch.object(
+ get_policy_plugin_manager().hook, "task_instance_mutation_hook",
autospec=True
+ ) as mock_hook:
+ mock_hook.side_effect = mutate_task_instance
+ with dag_maker(
+ dag_id="test_mutation_hook_run_id",
schedule=datetime.timedelta(days=1), session=session
+ ) as dag:
+ EmptyOperator(task_id="mutated_task", owner="test")
+
+ self.create_dag_run(dag, session=session)
+ # The hook should have been called during TI creation with run_id set
+ assert any(rid is not None for rid in observed_run_ids), (
+ f"task_instance_mutation_hook was called with run_id=None.
Observed run_ids: {observed_run_ids}"
+ )
+
@pytest.mark.parametrize(
("prev_ti_state", "is_ti_schedulable"),
[