This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 3a11f7fe95f Fix TaskInstance crash with non-serialized operators 
missing get_weight (#64557) (#64597)
3a11f7fe95f is described below

commit 3a11f7fe95ff18797d7aad9e41e8ff28d36c4802
Author: Rahul Vats <[email protected]>
AuthorDate: Thu Apr 2 04:32:11 2026 +0530

    Fix TaskInstance crash with non-serialized operators missing get_weight 
(#64557) (#64597)
    
    (cherry picked from commit 860277d7babc14f717797e7b2a5eb7547d8ab885)
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py     | 11 +++++++++--
 airflow-core/tests/unit/models/test_taskinstance.py | 20 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index e212ca68504..a0b95d8a3ea 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -88,6 +88,7 @@ from airflow.models.taskmap import TaskMap
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComSelectSequence, 
XComModel
 from airflow.settings import task_instance_mutation_hook
+from airflow.task.priority_strategy import 
validate_and_load_priority_weight_strategy
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
@@ -691,7 +692,10 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
 
         :meta private:
         """
-        priority_weight = task.weight_rule.get_weight(
+        weight_rule = task.weight_rule
+        if not hasattr(weight_rule, "get_weight"):
+            weight_rule = 
validate_and_load_priority_weight_strategy(weight_rule)
+        priority_weight = weight_rule.get_weight(
             TaskInstance(task=task, run_id=run_id, map_index=map_index, 
dag_version_id=dag_version_id)
         )
         context_carrier = new_task_run_carrier(dag_run.context_carrier)
@@ -872,7 +876,10 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
         self.queue = task.queue
         self.pool = pool_override or task.pool
         self.pool_slots = task.pool_slots
-        self.priority_weight = self.task.weight_rule.get_weight(self)
+        weight_rule = self.task.weight_rule
+        if not hasattr(weight_rule, "get_weight"):
+            weight_rule = 
validate_and_load_priority_weight_strategy(weight_rule)
+        self.priority_weight = weight_rule.get_weight(self)
         self.run_as_user = task.run_as_user
         # Do not set max_tries to task.retries here because max_tries is a 
cumulative
         # value that needs to be stored in the db.
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index bb058d1a737..07dbde0ec9b 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -2653,6 +2653,26 @@ def test_refresh_from_task(pool_override, 
queue_by_policy, monkeypatch):
     assert ti.max_tries == expected_max_tries
 
 
[email protected](
+    ("weight_rule", "expected_weight"),
+    [
+        pytest.param("downstream", 10 + 5, id="downstream-sums-descendants"),
+        pytest.param("upstream", 10, id="upstream-no-ancestors"),
+        pytest.param("absolute", 10, id="absolute-self-only"),
+    ],
+)
+def test_refresh_from_task_with_non_serialized_operator(weight_rule, 
expected_weight):
+    """Regression: TaskInstance must work with non-serialized operators whose 
weight_rule is a WeightRule enum."""
+    with DAG(dag_id="test_dag"):
+        root = EmptyOperator(task_id="root", priority_weight=10, 
weight_rule=weight_rule)
+        child = EmptyOperator(task_id="child", priority_weight=5)
+        root >> child
+
+    ti = TI(root, run_id=None, dag_version_id=mock.MagicMock())
+
+    assert ti.priority_weight == expected_weight
+
+
 class TestTaskInstanceRecordTaskMapXComPush:
     """Test TI.xcom_push() correctly records return values for task-mapping."""
 

Reply via email to