jedcunningham commented on code in PR #39336:
URL: https://github.com/apache/airflow/pull/39336#discussion_r1594440137
##########
airflow/models/taskinstancekey.py:
##########
@@ -37,6 +37,7 @@ def primary(self) -> tuple[str, str, str, int]:
@property
def reduced(self) -> TaskInstanceKey:
"""Remake the key by subtracting 1 from try number to match in memory
information."""
+ # todo (dstandish): remove this property
Review Comment:
todo reminder :)
##########
airflow/models/taskinstance.py:
##########
@@ -1549,53 +1519,22 @@ def init_on_load(self) -> None:
"""Initialize the attributes that aren't stored in the DB."""
self.test_mode = False # can be changed when calling 'run'
- @hybrid_property
- def try_number(self):
- """
- Return the try number that a task number will be when it is actually
run.
-
- If the TaskInstance is currently running, this will match the column
in the
- database, in all other cases this will be incremented.
-
- This is designed so that task logs end up in the right file.
- """
- return _get_try_number(task_instance=self)
-
- @try_number.expression
- def try_number(cls):
- """
- Return the expression to be used by SQLAlchemy when filtering on
try_number.
-
- This is required because the override in the get_try_number function
causes
- try_number values to be off by one when listing tasks in the UI.
-
- :meta private:
- """
- return cls._try_number
-
- @try_number.setter
- def try_number(self, value: int) -> None:
- """
- Set a task try number.
-
- :param value: the try number
- """
- _set_try_number(task_instance=self, value=value)
-
@property
+ @deprecated(reason="Use try_number instead.", version="2.10.0",
category=RemovedInAirflow3Warning)
def prev_attempted_tries(self) -> int:
"""
- Calculate the number of previously attempted tries, defaulting to 0.
+ Calculate the total number of attempted tries, defaulting to 0.
+
+ This used to be necessary because try_number did not always tell the
truth.
- Expose this for the Task Tries and Gantt graph views.
- Using `try_number` throws off the counts for non-running tasks.
- Also useful in error logging contexts to get the try number for the
last try that was attempted.
+ :meta private:
"""
- return self._try_number
+ return self.try_number
@property
def next_try_number(self) -> int:
- return self._try_number + 1
+ # todo (dstandish): deprecate this property; we don't need a property
that is just + 1
Review Comment:
todo reminder :)
##########
airflow/models/taskinstance.py:
##########
@@ -1508,6 +1458,26 @@ def __init__(
def __hash__(self):
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
+ @property
+ @deprecated(reason="Use try_number instead.", version="2.10.0",
category=RemovedInAirflow3Warning)
+ def _try_number(self):
+ """
+ Do not use. For semblance of backcompat.
+
+ :meta private:
+ """
+ return self.try_number
+
+ @_try_number.setter
+ @deprecated(reason="Use try_number instead.", version="2.10.0",
category=RemovedInAirflow3Warning)
+ def _try_number(self, val):
+ """
+ Do not use. For semblance of backcompat.
Review Comment:
```suggestion
Do not use. For semblance of backcompat.
```
nit
##########
airflow/providers/dbt/cloud/utils/openlineage.py:
##########
@@ -21,13 +21,28 @@
from contextlib import suppress
from typing import TYPE_CHECKING
+try:
+ from airflow import __version__ as airflow_version
Review Comment:
This always exists in 2.7+. In fact, I think it works all the back to 2.4 :)
##########
tests/jobs/test_backfill_job.py:
##########
@@ -1525,7 +1544,7 @@ def test_update_counters(self, dag_maker, session):
# match what's in the in-memory ti_status.running map. This is the same
# for skipped, failed and retry states.
ti_status.running[ti.key] = ti # Task is queued and marked as running
- ti._try_number += 1 # Try number is increased during ti.run()
+ ti.try_number += 1 # Try number is increased during ti.run()
Review Comment:
This comment isn't right anymore, right?
##########
newsfragments/39336.significant.rst:
##########
@@ -0,0 +1,5 @@
+``try_number`` is no longer incremented during task execution
+
+Previously, the try_number was incremented at the beginning of task execution
on the worker. This was problematic for many reasons. For one it meant that
the try_number was incremented when it was not supposed to, namely when
resuming from reschedule or deferral. And it also resulted in the try_number
being "wrong" when the task had not yet started. The workarounds for these two
issues caused a lot of confusion. What we do instead now, is we only increment
the try_number when it is scheduled by the scheduler. So the try number for a
task run is determined in advanced, and does not change in flight, and it is
never decremented. So after the task runs, the observed try_number be remain
the same as it was when the task was running; only when there is a "new try"
will the try_number be incremented again.
+
+One consequence of this change is, if users were "manually" running tasks
(e.g. by calling ``ti.run()`` directly, or command line ``airflow tasks run``),
try_number will no longer be incremented. Airflow assumes that tasks are
always run after being scheduled by the scheduler, so we do not regard this as
a breaking change.
Review Comment:
```suggestion
Previously, the ``try_number`` was incremented at the beginning of task
execution on the worker. This was problematic for many reasons. For one it
meant that the ``try_number`` was incremented when it was not supposed to,
namely when resuming from reschedule or deferral. And it also resulted in the
``try_number`` being "wrong" when the task had not yet started. The workarounds
for these two issues caused a lot of confusion. Not instead, we only increment
the ``try_number`` when the task is scheduled by the scheduler. The
``try_number`` for a task run is determined in advanced, and does not change in
flight, and it is never decremented. So after the task runs, the observed
``try_number`` remains the same as it was when the task was running; only when
there is a "new try" will the ``try_number`` be incremented again.
One consequence of this change is, if users were "manually" running tasks
(e.g. by calling ``ti.run()`` directly, or command line ``airflow tasks run``),
``try_number`` will no longer be incremented. Airflow assumes that tasks are
always run after being scheduled by the scheduler, so we do not regard this as
a breaking change.
```
We could convert a few ``try_number``s to "try number" if we want less
inline code.
##########
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##########
@@ -527,7 +527,7 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
ti.queue,
ti.command_as_list(),
ti.executor_config,
- ti.prev_attempted_tries,
+ ti.try_number, # prev attempted tries
Review Comment:
```suggestion
ti.try_number,
```
Not sure we need this comment?
##########
airflow/www/views.py:
##########
@@ -5196,7 +5196,7 @@ class TaskInstanceModelView(AirflowModelView):
"pool",
"queued_by_job_id",
]
-
+ # todo: don't use prev_attempted_tries; just use try_number
Review Comment:
todo reminder :)
##########
airflow/models/taskinstance.py:
##########
@@ -1508,6 +1458,26 @@ def __init__(
def __hash__(self):
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
+ @property
+ @deprecated(reason="Use try_number instead.", version="2.10.0",
category=RemovedInAirflow3Warning)
+ def _try_number(self):
+ """
+ Do not use. For semblance of backcompat.
Review Comment:
```suggestion
Do not use. For semblance of backcompat.
```
nit!
##########
tests/jobs/test_backfill_job.py:
##########
@@ -185,7 +185,7 @@ def test_trigger_controller_dag(self, session):
assert task_instances_list
- @pytest.mark.backend("postgres", "mysql")
+ # @pytest.mark.backend("postgres", "mysql")
Review Comment:
Did you mean to leave this commented out?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]