This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch sync_2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a8fc06a28fc5c46f3cbcce6951bb106b53cc3aa6
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Nov 17 22:19:31 2024 +0100

    Ensure priority weight is capped at 32-bit integer to prevent roll-over 
(#43611) (#44045)
    
    * Ensure priority weight is capped at 32-bit integer to prevent roll-over
    
    * Add newsfragment
    
    * Move range check post type check
    
    * Review feedback - consolidate to single implementation for now
    
    (cherry picked from commit ab529d13042c9a9c036cd4a03d04c9aa819adf34)
---
 airflow/models/abstractoperator.py                        | 15 +++++++++------
 airflow/models/baseoperator.py                            |  5 ++++-
 airflow/utils/weight_rule.py                              | 12 ++++++++++++
 .../administration-and-deployment/priority-weight.rst     |  6 ++++++
 newsfragments/43611.significant.rst                       |  6 ++++++
 tests/utils/test_weight_rule.py                           |  9 ++++++++-
 6 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index 45eb3c5fff1..ec3d1f5309a 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -40,7 +40,7 @@ from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.task_group import MappedTaskGroup
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import NOTSET, ArgNotSet
-from airflow.utils.weight_rule import WeightRule
+from airflow.utils.weight_rule import WeightRule, db_safe_priority
 
 TaskStateChangeCallback = Callable[[Context], None]
 
@@ -467,7 +467,7 @@ class AbstractOperator(Templater, DAGNode):
         )
 
         if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy):
-            return self.priority_weight
+            return db_safe_priority(self.priority_weight)
         elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy):
             upstream = False
         elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy):
@@ -476,10 +476,13 @@ class AbstractOperator(Templater, DAGNode):
             upstream = False
         dag = self.get_dag()
         if dag is None:
-            return self.priority_weight
-        return self.priority_weight + sum(
-            dag.task_dict[task_id].priority_weight
-            for task_id in self.get_flat_relative_ids(upstream=upstream)
+            return db_safe_priority(self.priority_weight)
+        return db_safe_priority(
+            self.priority_weight
+            + sum(
+                dag.task_dict[task_id].priority_weight
+                for task_id in self.get_flat_relative_ids(upstream=upstream)
+            )
         )
 
     @cached_property
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 449678860f8..11522060fe0 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -656,6 +656,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         This allows the executor to trigger higher priority tasks before
         others when things get backed up. Set priority_weight as a higher
         number for more important tasks.
+        As not all database engines support 64-bit integers, values are capped 
with 32-bit.
+        Valid range is from -2,147,483,648 to 2,147,483,647.
     :param weight_rule: weighting method used for the effective total
         priority weight of the task. Options are:
         ``{ downstream | upstream | absolute }`` default is ``downstream``
@@ -677,7 +679,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         Additionally, when set to ``absolute``, there is bonus effect of
         significantly speeding up the task creation process as for very large
         DAGs. Options can be set as string or using the constants defined in
-        the static class ``airflow.utils.WeightRule``
+        the static class ``airflow.utils.WeightRule``.
+        Irrespective of the weight rule, resulting priority values are capped 
with 32-bit.
         |experimental|
         Since 2.9.0, Airflow allows to define custom priority weight strategy,
         by creating a subclass of
diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py
index a63358b0322..490bcfbe888 100644
--- a/airflow/utils/weight_rule.py
+++ b/airflow/utils/weight_rule.py
@@ -21,6 +21,18 @@ from enum import Enum
 
 import methodtools
 
+# Databases do not support arbitrary precision integers, so we need to limit 
the range of priority weights.
+# postgres: -2147483648 to +2147483647 (see 
https://www.postgresql.org/docs/current/datatype-numeric.html)
+# mysql: -2147483648 to +2147483647 (see 
https://dev.mysql.com/doc/refman/8.4/en/integer-types.html)
+# sqlite: -9223372036854775808 to +9223372036854775807 (see 
https://sqlite.org/datatype3.html)
+DB_SAFE_MINIMUM = -2147483648
+DB_SAFE_MAXIMUM = 2147483647
+
+
+def db_safe_priority(priority_weight: int) -> int:
+    """Convert priority weight to a safe value for the database."""
+    return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight))
+
 
 class WeightRule(str, Enum):
     """Weight rules."""
diff --git 
a/docs/apache-airflow/administration-and-deployment/priority-weight.rst 
b/docs/apache-airflow/administration-and-deployment/priority-weight.rst
index dd61d25fcd4..7bdeff64502 100644
--- a/docs/apache-airflow/administration-and-deployment/priority-weight.rst
+++ b/docs/apache-airflow/administration-and-deployment/priority-weight.rst
@@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's 
weighting method is ``dow
 
 The ``priority_weight`` parameter can be used in conjunction with 
:ref:`concepts:pool`.
 
+.. note::
+
+    As most database engines are using 32-bit for integers, the maximum value 
for any calculated or
+    defined ``priority_weight`` is 2,147,483,647 and the minimum value is 
-2,147,483,648.
+
+
 Custom Weight Rule
 ------------------
 
diff --git a/newsfragments/43611.significant.rst 
b/newsfragments/43611.significant.rst
new file mode 100644
index 00000000000..e25fb2a5bba
--- /dev/null
+++ b/newsfragments/43611.significant.rst
@@ -0,0 +1,6 @@
+TaskInstance ``priority_weight`` is capped in 32-bit signed integer ranges.
+
+Some database engines are limited to 32-bit integer values. As some users 
reported errors in
+weight rolled-over to negative values, we decided to cap the value to the 
32-bit integer. Even
+if internally in python smaller or larger values to 64 bit are supported, 
``priority_weight`` is
+capped and only storing values from -2147483648 to 2147483647.
diff --git a/tests/utils/test_weight_rule.py b/tests/utils/test_weight_rule.py
index 73abafe782b..387bb9b09e4 100644
--- a/tests/utils/test_weight_rule.py
+++ b/tests/utils/test_weight_rule.py
@@ -19,7 +19,14 @@ from __future__ import annotations
 
 import pytest
 
-from airflow.utils.weight_rule import WeightRule
+from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, 
WeightRule, db_safe_priority
+
+
+def test_db_safe_priority():
+    assert db_safe_priority(1) == 1
+    assert db_safe_priority(-1) == -1
+    assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM
+    assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM
 
 
 class TestWeightRule:

Reply via email to