This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 3a11f7fe95f Fix TaskInstance crash with non-serialized operators
missing get_weight (#64557) (#64597)
3a11f7fe95f is described below
commit 3a11f7fe95ff18797d7aad9e41e8ff28d36c4802
Author: Rahul Vats <[email protected]>
AuthorDate: Thu Apr 2 04:32:11 2026 +0530
Fix TaskInstance crash with non-serialized operators missing get_weight
(#64557) (#64597)
(cherry picked from commit 860277d7babc14f717797e7b2a5eb7547d8ab885)
Co-authored-by: Kaxil Naik <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 11 +++++++++--
airflow-core/tests/unit/models/test_taskinstance.py | 20 ++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index e212ca68504..a0b95d8a3ea 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -88,6 +88,7 @@ from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComSelectSequence,
XComModel
from airflow.settings import task_instance_mutation_hook
+from airflow.task.priority_strategy import
validate_and_load_priority_weight_strategy
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
@@ -691,7 +692,10 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
:meta private:
"""
- priority_weight = task.weight_rule.get_weight(
+ weight_rule = task.weight_rule
+ if not hasattr(weight_rule, "get_weight"):
+ weight_rule =
validate_and_load_priority_weight_strategy(weight_rule)
+ priority_weight = weight_rule.get_weight(
TaskInstance(task=task, run_id=run_id, map_index=map_index,
dag_version_id=dag_version_id)
)
context_carrier = new_task_run_carrier(dag_run.context_carrier)
@@ -872,7 +876,10 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
self.queue = task.queue
self.pool = pool_override or task.pool
self.pool_slots = task.pool_slots
- self.priority_weight = self.task.weight_rule.get_weight(self)
+ weight_rule = self.task.weight_rule
+ if not hasattr(weight_rule, "get_weight"):
+ weight_rule =
validate_and_load_priority_weight_strategy(weight_rule)
+ self.priority_weight = weight_rule.get_weight(self)
self.run_as_user = task.run_as_user
# Do not set max_tries to task.retries here because max_tries is a
cumulative
# value that needs to be stored in the db.
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index bb058d1a737..07dbde0ec9b 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -2653,6 +2653,26 @@ def test_refresh_from_task(pool_override,
queue_by_policy, monkeypatch):
assert ti.max_tries == expected_max_tries
[email protected](
+ ("weight_rule", "expected_weight"),
+ [
+ pytest.param("downstream", 10 + 5, id="downstream-sums-descendants"),
+ pytest.param("upstream", 10, id="upstream-no-ancestors"),
+ pytest.param("absolute", 10, id="absolute-self-only"),
+ ],
+)
+def test_refresh_from_task_with_non_serialized_operator(weight_rule,
expected_weight):
+ """Regression: TaskInstance must work with non-serialized operators whose
weight_rule is a WeightRule enum."""
+ with DAG(dag_id="test_dag"):
+ root = EmptyOperator(task_id="root", priority_weight=10,
weight_rule=weight_rule)
+ child = EmptyOperator(task_id="child", priority_weight=5)
+ root >> child
+
+ ti = TI(root, run_id=None, dag_version_id=mock.MagicMock())
+
+ assert ti.priority_weight == expected_weight
+
+
class TestTaskInstanceRecordTaskMapXComPush:
"""Test TI.xcom_push() correctly records return values for task-mapping."""