This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 615cddf427 airflow.models.taskinstance deprecations removed (#41784)
615cddf427 is described below
commit 615cddf427081bdbafc9437569946b16390deddb
Author: Gopal Dirisala <[email protected]>
AuthorDate: Tue Aug 27 20:15:44 2024 +0530
airflow.models.taskinstance deprecations removed (#41784)
---
airflow/models/taskinstance.py | 171 ---------------------
.../amazon/aws/executors/batch/batch_executor.py | 2 +-
airflow/utils/log/file_task_handler.py | 2 +-
airflow/utils/log/log_reader.py | 2 +-
newsfragments/41784.significant.rst | 12 ++
tests/jobs/test_triggerer_job.py | 2 +-
tests/models/test_baseoperator.py | 6 +-
tests/models/test_dagrun.py | 2 +-
tests/providers/microsoft/conftest.py | 5 +-
.../deps/test_dag_ti_slots_available_dep.py | 4 +-
tests/ti_deps/deps/test_dag_unpaused_dep.py | 4 +-
tests/ti_deps/deps/test_not_in_retry_period_dep.py | 2 +-
12 files changed, 26 insertions(+), 188 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index e2b59ebbab..1d3909aed4 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -38,7 +38,6 @@ import dill
import jinja2
import lazy_object_proxy
import pendulum
-from deprecated import deprecated
from jinja2 import TemplateAssertionError, UndefinedError
from sqlalchemy import (
Column,
@@ -80,7 +79,6 @@ from airflow.exceptions import (
AirflowSkipException,
AirflowTaskTerminated,
AirflowTaskTimeout,
- DagRunNotFound,
RemovedInAirflow3Warning,
TaskDeferralError,
TaskDeferred,
@@ -425,7 +423,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance |
TaskInstancePydantic,
def clear_task_instances(
tis: list[TaskInstance],
session: Session,
- activate_dag_runs: None = None,
dag: DAG | None = None,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
) -> None:
@@ -443,7 +440,6 @@ def clear_task_instances(
:param dag_run_state: state to set finished DagRuns to.
If set to False, DagRuns state will not be changed.
:param dag: DAG object
- :param activate_dag_runs: Deprecated parameter, do not pass
"""
job_ids = []
# Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
@@ -521,16 +517,6 @@ def clear_task_instances(
session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))
- if activate_dag_runs is not None:
- warnings.warn(
- "`activate_dag_runs` parameter to clear_task_instances function is
deprecated. "
- "Please use `dag_run_state`",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- if not activate_dag_runs:
- dag_run_state = False
-
if dag_run_state is not False and tis:
from airflow.models.dagrun import DagRun # Avoid circular import
@@ -1922,7 +1908,6 @@ class TaskInstance(Base, LoggingMixin):
def __init__(
self,
task: Operator,
- execution_date: datetime | None = None,
run_id: str | None = None,
state: str | None = None,
map_index: int = -1,
@@ -1938,42 +1923,7 @@ class TaskInstance(Base, LoggingMixin):
# init_on_load will config the log
self.init_on_load()
- if run_id is None and execution_date is not None:
- from airflow.models.dagrun import DagRun # Avoid circular import
-
- warnings.warn(
- "Passing an execution_date to `TaskInstance()` is deprecated
in favour of passing a run_id",
- RemovedInAirflow3Warning,
- # Stack level is 4 because SQLA adds some wrappers around the
constructor
- stacklevel=4,
- )
- # make sure we have a localized execution_date stored in UTC
- if execution_date and not timezone.is_localized(execution_date):
- self.log.warning(
- "execution date %s has no timezone information. Using
default from dag or system",
- execution_date,
- )
- if self.task.has_dag():
- if TYPE_CHECKING:
- assert self.task.dag
- execution_date = timezone.make_aware(execution_date,
self.task.dag.timezone)
- else:
- execution_date = timezone.make_aware(execution_date)
-
- execution_date = timezone.convert_to_utc(execution_date)
- with create_session() as session:
- run_id = (
- session.query(DagRun.run_id)
- .filter_by(dag_id=self.dag_id,
execution_date=execution_date)
- .scalar()
- )
- if run_id is None:
- raise DagRunNotFound(
- f"DagRun for {self.dag_id!r} with date
{execution_date} not found"
- ) from None
-
self.run_id = run_id
-
self.try_number = 0
self.max_tries = self.task.retries
self.unixname = getuser()
@@ -1989,26 +1939,6 @@ class TaskInstance(Base, LoggingMixin):
def __hash__(self):
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
- @property
- @deprecated(reason="Use try_number instead.", version="2.10.0",
category=RemovedInAirflow3Warning)
- def _try_number(self):
- """
- Do not use. For semblance of backcompat.
-
- :meta private:
- """
- return self.try_number
-
- @_try_number.setter
- @deprecated(reason="Use try_number instead.", version="2.10.0",
category=RemovedInAirflow3Warning)
- def _try_number(self, val):
- """
- Do not use. For semblance of backcompat.
-
- :meta private:
- """
- self.try_number = val
-
@property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
@@ -2051,23 +1981,6 @@ class TaskInstance(Base, LoggingMixin):
"""Initialize the attributes that aren't stored in the DB."""
self.test_mode = False # can be changed when calling 'run'
- @property
- @deprecated(reason="Use try_number instead.", version="2.10.0",
category=RemovedInAirflow3Warning)
- def prev_attempted_tries(self) -> int:
- """
- Calculate the total number of attempted tries, defaulting to 0.
-
- This used to be necessary because try_number did not always tell the
truth.
-
- :meta private:
- """
- return self.try_number
-
- @property
- def next_try_number(self) -> int:
- # todo (dstandish): deprecate this property; we don't need a property
that is just + 1
- return self.try_number + 1
-
@property
def operator_name(self) -> str | None:
"""@property: use a more friendly display name for the operator, if
set."""
@@ -2498,40 +2411,6 @@ class TaskInstance(Base, LoggingMixin):
"""
return _get_previous_ti(task_instance=self, state=state,
session=session)
- @property
- def previous_ti(self) -> TaskInstance | TaskInstancePydantic | None:
- """
- This attribute is deprecated.
-
- Please use
:class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
- """
- warnings.warn(
- """
- This attribute is deprecated.
- Please use
`airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
- """,
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self.get_previous_ti()
-
- @property
- def previous_ti_success(self) -> TaskInstance | TaskInstancePydantic |
None:
- """
- This attribute is deprecated.
-
- Please use
:class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
- """
- warnings.warn(
- """
- This attribute is deprecated.
- Please use
`airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
- """,
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self.get_previous_ti(state=DagRunState.SUCCESS)
-
@provide_session
def get_previous_execution_date(
self,
@@ -2558,23 +2437,6 @@ class TaskInstance(Base, LoggingMixin):
"""
return _get_previous_start_date(task_instance=self, state=state,
session=session)
- @property
- def previous_start_date_success(self) -> pendulum.DateTime | None:
- """
- This attribute is deprecated.
-
- Please use
:class:`airflow.models.taskinstance.TaskInstance.get_previous_start_date`.
- """
- warnings.warn(
- """
- This attribute is deprecated.
- Please use
`airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
- """,
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return self.get_previous_start_date(state=DagRunState.SUCCESS)
-
@provide_session
def are_dependencies_met(
self, dep_context: DepContext | None = None, session: Session =
NEW_SESSION, verbose: bool = False
@@ -4115,21 +3977,6 @@ class SimpleTaskInstance:
return self.__dict__ == other.__dict__
return NotImplemented
- def as_dict(self):
- warnings.warn(
- "This method is deprecated. Use BaseSerialization.serialize.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- new_dict = dict(self.__dict__)
- for key in new_dict:
- if key in ["start_date", "end_date"]:
- val = new_dict[key]
- if not val or isinstance(val, str):
- continue
- new_dict.update({key: val.isoformat()})
- return new_dict
-
@classmethod
def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
return cls(
@@ -4150,24 +3997,6 @@ class SimpleTaskInstance:
priority_weight=ti.priority_weight if hasattr(ti,
"priority_weight") else None,
)
- @classmethod
- def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance:
- warnings.warn(
- "This method is deprecated. Use BaseSerialization.deserialize.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- ti_key = TaskInstanceKey(*obj_dict.pop("key"))
- start_date = None
- end_date = None
- start_date_str: str | None = obj_dict.pop("start_date")
- end_date_str: str | None = obj_dict.pop("end_date")
- if start_date_str:
- start_date = timezone.parse(start_date_str)
- if end_date_str:
- end_date = timezone.parse(end_date_str)
- return cls(**obj_dict, start_date=start_date, end_date=end_date,
key=ti_key)
-
class TaskInstanceNote(TaskInstanceDependencies):
"""For storage of arbitrary notes concerning the task instance."""
diff --git a/airflow/providers/amazon/aws/executors/batch/batch_executor.py
b/airflow/providers/amazon/aws/executors/batch/batch_executor.py
index 92790eb6c2..b38688defb 100644
--- a/airflow/providers/amazon/aws/executors/batch/batch_executor.py
+++ b/airflow/providers/amazon/aws/executors/batch/batch_executor.py
@@ -448,7 +448,7 @@ class AwsBatchExecutor(BaseExecutor):
airflow_cmd=ti.command_as_list(),
queue=ti.queue,
exec_config=ti.executor_config,
- attempt_number=ti.prev_attempted_tries,
+ attempt_number=ti.try_number,
)
adopted_tis.append(ti)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index eed196e8d8..24379d340a 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -462,7 +462,7 @@ class FileTaskHandler(logging.Handler):
# try number gets incremented in DB, i.e logs produced the time
# after cli run and before try_number + 1 in DB will not be displayed.
if try_number is None:
- next_try = task_instance.next_try_number
+ next_try = task_instance.try_number + 1
try_numbers = list(range(1, next_try))
elif try_number < 1:
logs = [
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index ad61a13908..c99efc350b 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -74,7 +74,7 @@ class TaskLogReader:
:param metadata: A dictionary containing information about how to read
the task log
"""
if try_number is None:
- next_try = ti.next_try_number
+ next_try = ti.try_number + 1
try_numbers = list(range(1, next_try))
else:
try_numbers = [try_number]
diff --git a/newsfragments/41784.significant.rst
b/newsfragments/41784.significant.rst
new file mode 100644
index 0000000000..d0d8a07cd0
--- /dev/null
+++ b/newsfragments/41784.significant.rst
@@ -0,0 +1,12 @@
+Removed a set of deprecations in from ``airflow.models.taskinstance``.
+
+ - Removed deprecated arg ``activate_dag_runs`` from
``TaskInstance.clear_task_instances()``. Please use ``dag_run_state`` instead.
+ - Removed deprecated arg ``execution_date`` from ``TaskInstance.__init__()``.
Please use ``run_id`` instead.
+ - Removed deprecated property ``_try_number`` from ``TaskInstance``. Please
use ``try_number`` instead.
+ - Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``.
Please use ``try_number`` instead.
+ - Removed deprecated property ``next_try_number`` from ``TaskInstance``.
Please use ``try_number + 1`` instead.
+ - Removed deprecated property ``previous_ti`` from ``TaskInstance``. Please
use ``get_previous_ti`` instead.
+ - Removed deprecated property ``previous_ti_success`` from ``TaskInstance``.
Please use ``get_previous_ti`` instead.
+ - Removed deprecated property ``previous_start_date_success`` from
``TaskInstance``. Please use ``get_previous_start_date`` instead.
+ - Removed deprecated function ``as_dict`` from ``SimpleTaskInstance``. Please
use ``BaseSerialization.serialize`` instead.
+ - Removed deprecated function ``from_dict`` from ``SimpleTaskInstance``.
Please use ``BaseSerialization.deserialize`` instead.
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 378afa0499..84b422342e 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -103,7 +103,7 @@ def create_trigger_in_db(session, trigger, operator=None):
operator.dag = dag
else:
operator = BaseOperator(task_id="test_ti", dag=dag)
- task_instance = TaskInstance(operator, execution_date=run.execution_date,
run_id=run.run_id)
+ task_instance = TaskInstance(operator, run_id=run.run_id)
task_instance.trigger_id = trigger_orm.id
session.add(dag_model)
session.add(run)
diff --git a/tests/models/test_baseoperator.py
b/tests/models/test_baseoperator.py
index d645b221a5..2aa5b76b22 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -1094,11 +1094,11 @@ def test_get_task_instances(session):
"run_type": DagRunType.MANUAL,
}
dr1 = DagRun(execution_date=first_execution_date, run_id="test_run_id_1",
**common_dr_kwargs)
- ti_1 = TaskInstance(run_id=dr1.run_id, task=task,
execution_date=first_execution_date)
+ ti_1 = TaskInstance(run_id=dr1.run_id, task=task)
dr2 = DagRun(execution_date=second_execution_date, run_id="test_run_id_2",
**common_dr_kwargs)
- ti_2 = TaskInstance(run_id=dr2.run_id, task=task,
execution_date=second_execution_date)
+ ti_2 = TaskInstance(run_id=dr2.run_id, task=task)
dr3 = DagRun(execution_date=third_execution_date, run_id="test_run_id_3",
**common_dr_kwargs)
- ti_3 = TaskInstance(run_id=dr3.run_id, task=task,
execution_date=third_execution_date)
+ ti_3 = TaskInstance(run_id=dr3.run_id, task=task)
session.add_all([dr1, dr2, dr3, ti_1, ti_2, ti_3])
session.commit()
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index fd0647577a..2c167a542d 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -1422,7 +1422,7 @@ def test_mapped_literal_faulty_state_in_db(dag_maker,
session):
assert len(decision.schedulable_tis) == 2
# We insert a faulty record
- session.add(TaskInstance(dag.get_task("task_2"), dr.execution_date,
dr.run_id))
+ session.add(TaskInstance(task=dag.get_task("task_2"), run_id=dr.run_id))
session.flush()
decision = dr.task_instance_scheduling_decisions()
diff --git a/tests/providers/microsoft/conftest.py
b/tests/providers/microsoft/conftest.py
index b18a2cb1fe..c77dd7747d 100644
--- a/tests/providers/microsoft/conftest.py
+++ b/tests/providers/microsoft/conftest.py
@@ -124,14 +124,11 @@ def mock_context(task) -> Context:
def __init__(
self,
task,
- execution_date: datetime | None = None,
run_id: str | None = "run_id",
state: str | None = TaskInstanceState.RUNNING,
map_index: int = -1,
):
- super().__init__(
- task=task, execution_date=execution_date, run_id=run_id,
state=state, map_index=map_index
- )
+ super().__init__(task=task, run_id=run_id, state=state,
map_index=map_index)
self.values: dict[str, Any] = {}
def xcom_pull(
diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
index 1deff072bd..b52a6f8e93 100644
--- a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
+++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
@@ -34,7 +34,7 @@ class TestDagTISlotsAvailableDep:
"""
dag = Mock(concurrency=1,
get_concurrency_reached=Mock(return_value=True))
task = Mock(dag=dag, pool_slots=1)
- ti = TaskInstance(task, execution_date=None)
+ ti = TaskInstance(task)
assert not DagTISlotsAvailableDep().is_met(ti=ti)
@@ -44,6 +44,6 @@ class TestDagTISlotsAvailableDep:
"""
dag = Mock(concurrency=1,
get_concurrency_reached=Mock(return_value=False))
task = Mock(dag=dag, pool_slots=1)
- ti = TaskInstance(task, execution_date=None)
+ ti = TaskInstance(task)
assert DagTISlotsAvailableDep().is_met(ti=ti)
diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py
b/tests/ti_deps/deps/test_dag_unpaused_dep.py
index e3f740a54c..576c4277eb 100644
--- a/tests/ti_deps/deps/test_dag_unpaused_dep.py
+++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py
@@ -34,7 +34,7 @@ class TestDagUnpausedDep:
"""
dag = Mock(**{"get_is_paused.return_value": True})
task = Mock(dag=dag)
- ti = TaskInstance(task=task, execution_date=None)
+ ti = TaskInstance(task=task)
assert not DagUnpausedDep().is_met(ti=ti)
@@ -44,6 +44,6 @@ class TestDagUnpausedDep:
"""
dag = Mock(**{"get_is_paused.return_value": False})
task = Mock(dag=dag)
- ti = TaskInstance(task=task, execution_date=None)
+ ti = TaskInstance(task=task)
assert DagUnpausedDep().is_met(ti=ti)
diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py
b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
index 17736abbf7..1b0de7c991 100644
--- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py
+++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
@@ -34,7 +34,7 @@ pytestmark = [pytest.mark.db_test,
pytest.mark.skip_if_database_isolation_mode]
class TestNotInRetryPeriodDep:
def _get_task_instance(self, state, end_date=None,
retry_delay=timedelta(minutes=15)):
task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False)
- ti = TaskInstance(task=task, state=state, execution_date=None)
+ ti = TaskInstance(task=task, state=state)
ti.end_date = end_date
return ti