jedcunningham commented on code in PR #39336:
URL: https://github.com/apache/airflow/pull/39336#discussion_r1594440137


##########
airflow/models/taskinstancekey.py:
##########
@@ -37,6 +37,7 @@ def primary(self) -> tuple[str, str, str, int]:
     @property
     def reduced(self) -> TaskInstanceKey:
         """Remake the key by subtracting 1 from try number to match in memory 
information."""
+        # todo (dstandish): remove this property

Review Comment:
   todo reminder :)



##########
airflow/models/taskinstance.py:
##########
@@ -1549,53 +1519,22 @@ def init_on_load(self) -> None:
         """Initialize the attributes that aren't stored in the DB."""
         self.test_mode = False  # can be changed when calling 'run'
 
-    @hybrid_property
-    def try_number(self):
-        """
-        Return the try number that a task number will be when it is actually 
run.
-
-        If the TaskInstance is currently running, this will match the column 
in the
-        database, in all other cases this will be incremented.
-
-        This is designed so that task logs end up in the right file.
-        """
-        return _get_try_number(task_instance=self)
-
-    @try_number.expression
-    def try_number(cls):
-        """
-        Return the expression to be used by SQLAlchemy when filtering on 
try_number.
-
-        This is required because the override in the get_try_number function 
causes
-        try_number values to be off by one when listing tasks in the UI.
-
-        :meta private:
-        """
-        return cls._try_number
-
-    @try_number.setter
-    def try_number(self, value: int) -> None:
-        """
-        Set a task try number.
-
-        :param value: the try number
-        """
-        _set_try_number(task_instance=self, value=value)
-
     @property
+    @deprecated(reason="Use try_number instead.", version="2.10.0", 
category=RemovedInAirflow3Warning)
     def prev_attempted_tries(self) -> int:
         """
-        Calculate the number of previously attempted tries, defaulting to 0.
+        Calculate the total number of attempted tries, defaulting to 0.
+
+        This used to be necessary because try_number did not always tell the 
truth.
 
-        Expose this for the Task Tries and Gantt graph views.
-        Using `try_number` throws off the counts for non-running tasks.
-        Also useful in error logging contexts to get the try number for the 
last try that was attempted.
+        :meta private:
         """
-        return self._try_number
+        return self.try_number
 
     @property
     def next_try_number(self) -> int:
-        return self._try_number + 1
+        # todo (dstandish): deprecate this property; we don't need a property 
that is just + 1

Review Comment:
   todo reminder :)



##########
airflow/models/taskinstance.py:
##########
@@ -1508,6 +1458,26 @@ def __init__(
     def __hash__(self):
         return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
 
+    @property
+    @deprecated(reason="Use try_number instead.", version="2.10.0", 
category=RemovedInAirflow3Warning)
+    def _try_number(self):
+        """
+        Do not use.  For semblance of backcompat.
+
+        :meta private:
+        """
+        return self.try_number
+
+    @_try_number.setter
+    @deprecated(reason="Use try_number instead.", version="2.10.0", 
category=RemovedInAirflow3Warning)
+    def _try_number(self, val):
+        """
+        Do not use.  For semblance of backcompat.

Review Comment:
   ```suggestion
           Do not use. For semblance of backcompat.
   ```
   
   nit



##########
airflow/providers/dbt/cloud/utils/openlineage.py:
##########
@@ -21,13 +21,28 @@
 from contextlib import suppress
 from typing import TYPE_CHECKING
 
+try:
+    from airflow import __version__ as airflow_version

Review Comment:
   This always exists in 2.7+. In fact, I think it works all the back to 2.4 :)



##########
tests/jobs/test_backfill_job.py:
##########
@@ -1525,7 +1544,7 @@ def test_update_counters(self, dag_maker, session):
         # match what's in the in-memory ti_status.running map. This is the same
         # for skipped, failed and retry states.
         ti_status.running[ti.key] = ti  # Task is queued and marked as running
-        ti._try_number += 1  # Try number is increased during ti.run()
+        ti.try_number += 1  # Try number is increased during ti.run()

Review Comment:
   This comment isn't right anymore, right?



##########
newsfragments/39336.significant.rst:
##########
@@ -0,0 +1,5 @@
+``try_number`` is no longer incremented during task execution
+
+Previously, the try_number was incremented at the beginning of task execution 
on the worker.  This was problematic for many reasons. For one it meant that 
the try_number was incremented when it was not supposed to, namely when 
resuming from reschedule or deferral. And it also resulted in the try_number 
being "wrong" when the task had not yet started.  The workarounds for these two 
issues caused a lot of confusion.  What we do instead now, is we only increment 
the try_number when it is scheduled by the scheduler.  So the try number for a 
task run is determined in advanced, and does not change in flight, and it is 
never decremented.  So after the task runs, the observed try_number be remain 
the same as it was when the task was running; only when there is a "new try" 
will the try_number be incremented again.
+
+One consequence of this change is, if users were "manually" running tasks 
(e.g. by calling ``ti.run()`` directly, or command line ``airflow tasks run``), 
try_number will no longer be incremented.  Airflow assumes that tasks are 
always run after being scheduled by the scheduler, so we do not regard this as 
a breaking change.

Review Comment:
   ```suggestion
   Previously, the ``try_number`` was incremented at the beginning of task 
execution on the worker. This was problematic for many reasons. For one it 
meant that the ``try_number`` was incremented when it was not supposed to, 
namely when resuming from reschedule or deferral. And it also resulted in the 
``try_number`` being "wrong" when the task had not yet started. The workarounds 
for these two issues caused a lot of confusion. Not instead, we only increment 
the ``try_number`` when the task is scheduled by the scheduler. The 
``try_number`` for a task run is determined in advanced, and does not change in 
flight, and it is never decremented. So after the task runs, the observed 
``try_number`` remains the same as it was when the task was running; only when 
there is a "new try" will the ``try_number`` be incremented again.
   
   One consequence of this change is, if users were "manually" running tasks 
(e.g. by calling ``ti.run()`` directly, or command line ``airflow tasks run``), 
``try_number`` will no longer be incremented. Airflow assumes that tasks are 
always run after being scheduled by the scheduler, so we do not regard this as 
a breaking change.
   ```
   
   We could convert a few ``try_number``s to "try number" if we want less 
inline code.



##########
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##########
@@ -527,7 +527,7 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
                         ti.queue,
                         ti.command_as_list(),
                         ti.executor_config,
-                        ti.prev_attempted_tries,
+                        ti.try_number,  # prev attempted tries

Review Comment:
   ```suggestion
                           ti.try_number,
   ```
   
   Not sure we need this comment?



##########
airflow/www/views.py:
##########
@@ -5196,7 +5196,7 @@ class TaskInstanceModelView(AirflowModelView):
         "pool",
         "queued_by_job_id",
     ]
-
+    # todo: don't use prev_attempted_tries; just use try_number

Review Comment:
   todo reminder :)



##########
airflow/models/taskinstance.py:
##########
@@ -1508,6 +1458,26 @@ def __init__(
     def __hash__(self):
         return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
 
+    @property
+    @deprecated(reason="Use try_number instead.", version="2.10.0", 
category=RemovedInAirflow3Warning)
+    def _try_number(self):
+        """
+        Do not use.  For semblance of backcompat.

Review Comment:
   ```suggestion
           Do not use. For semblance of backcompat.
   ```
   
   nit!



##########
tests/jobs/test_backfill_job.py:
##########
@@ -185,7 +185,7 @@ def test_trigger_controller_dag(self, session):
 
         assert task_instances_list
 
-    @pytest.mark.backend("postgres", "mysql")
+    # @pytest.mark.backend("postgres", "mysql")

Review Comment:
   Did you mean to leave this commented out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to