This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 615cddf427 airflow.models.taskinstance deprecations removed (#41784)
615cddf427 is described below

commit 615cddf427081bdbafc9437569946b16390deddb
Author: Gopal Dirisala <[email protected]>
AuthorDate: Tue Aug 27 20:15:44 2024 +0530

    airflow.models.taskinstance deprecations removed (#41784)
---
 airflow/models/taskinstance.py                     | 171 ---------------------
 .../amazon/aws/executors/batch/batch_executor.py   |   2 +-
 airflow/utils/log/file_task_handler.py             |   2 +-
 airflow/utils/log/log_reader.py                    |   2 +-
 newsfragments/41784.significant.rst                |  12 ++
 tests/jobs/test_triggerer_job.py                   |   2 +-
 tests/models/test_baseoperator.py                  |   6 +-
 tests/models/test_dagrun.py                        |   2 +-
 tests/providers/microsoft/conftest.py              |   5 +-
 .../deps/test_dag_ti_slots_available_dep.py        |   4 +-
 tests/ti_deps/deps/test_dag_unpaused_dep.py        |   4 +-
 tests/ti_deps/deps/test_not_in_retry_period_dep.py |   2 +-
 12 files changed, 26 insertions(+), 188 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index e2b59ebbab..1d3909aed4 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -38,7 +38,6 @@ import dill
 import jinja2
 import lazy_object_proxy
 import pendulum
-from deprecated import deprecated
 from jinja2 import TemplateAssertionError, UndefinedError
 from sqlalchemy import (
     Column,
@@ -80,7 +79,6 @@ from airflow.exceptions import (
     AirflowSkipException,
     AirflowTaskTerminated,
     AirflowTaskTimeout,
-    DagRunNotFound,
     RemovedInAirflow3Warning,
     TaskDeferralError,
     TaskDeferred,
@@ -425,7 +423,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance | 
TaskInstancePydantic,
 def clear_task_instances(
     tis: list[TaskInstance],
     session: Session,
-    activate_dag_runs: None = None,
     dag: DAG | None = None,
     dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
 ) -> None:
@@ -443,7 +440,6 @@ def clear_task_instances(
     :param dag_run_state: state to set finished DagRuns to.
         If set to False, DagRuns state will not be changed.
     :param dag: DAG object
-    :param activate_dag_runs: Deprecated parameter, do not pass
     """
     job_ids = []
     # Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
@@ -521,16 +517,6 @@ def clear_task_instances(
 
         
session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))
 
-    if activate_dag_runs is not None:
-        warnings.warn(
-            "`activate_dag_runs` parameter to clear_task_instances function is 
deprecated. "
-            "Please use `dag_run_state`",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        if not activate_dag_runs:
-            dag_run_state = False
-
     if dag_run_state is not False and tis:
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
@@ -1922,7 +1908,6 @@ class TaskInstance(Base, LoggingMixin):
     def __init__(
         self,
         task: Operator,
-        execution_date: datetime | None = None,
         run_id: str | None = None,
         state: str | None = None,
         map_index: int = -1,
@@ -1938,42 +1923,7 @@ class TaskInstance(Base, LoggingMixin):
         # init_on_load will config the log
         self.init_on_load()
 
-        if run_id is None and execution_date is not None:
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            warnings.warn(
-                "Passing an execution_date to `TaskInstance()` is deprecated 
in favour of passing a run_id",
-                RemovedInAirflow3Warning,
-                # Stack level is 4 because SQLA adds some wrappers around the 
constructor
-                stacklevel=4,
-            )
-            # make sure we have a localized execution_date stored in UTC
-            if execution_date and not timezone.is_localized(execution_date):
-                self.log.warning(
-                    "execution date %s has no timezone information. Using 
default from dag or system",
-                    execution_date,
-                )
-                if self.task.has_dag():
-                    if TYPE_CHECKING:
-                        assert self.task.dag
-                    execution_date = timezone.make_aware(execution_date, 
self.task.dag.timezone)
-                else:
-                    execution_date = timezone.make_aware(execution_date)
-
-                execution_date = timezone.convert_to_utc(execution_date)
-            with create_session() as session:
-                run_id = (
-                    session.query(DagRun.run_id)
-                    .filter_by(dag_id=self.dag_id, 
execution_date=execution_date)
-                    .scalar()
-                )
-                if run_id is None:
-                    raise DagRunNotFound(
-                        f"DagRun for {self.dag_id!r} with date 
{execution_date} not found"
-                    ) from None
-
         self.run_id = run_id
-
         self.try_number = 0
         self.max_tries = self.task.retries
         self.unixname = getuser()
@@ -1989,26 +1939,6 @@ class TaskInstance(Base, LoggingMixin):
     def __hash__(self):
         return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
 
-    @property
-    @deprecated(reason="Use try_number instead.", version="2.10.0", 
category=RemovedInAirflow3Warning)
-    def _try_number(self):
-        """
-        Do not use. For semblance of backcompat.
-
-        :meta private:
-        """
-        return self.try_number
-
-    @_try_number.setter
-    @deprecated(reason="Use try_number instead.", version="2.10.0", 
category=RemovedInAirflow3Warning)
-    def _try_number(self, val):
-        """
-        Do not use. For semblance of backcompat.
-
-        :meta private:
-        """
-        self.try_number = val
-
     @property
     def stats_tags(self) -> dict[str, str]:
         """Returns task instance tags."""
@@ -2051,23 +1981,6 @@ class TaskInstance(Base, LoggingMixin):
         """Initialize the attributes that aren't stored in the DB."""
         self.test_mode = False  # can be changed when calling 'run'
 
-    @property
-    @deprecated(reason="Use try_number instead.", version="2.10.0", 
category=RemovedInAirflow3Warning)
-    def prev_attempted_tries(self) -> int:
-        """
-        Calculate the total number of attempted tries, defaulting to 0.
-
-        This used to be necessary because try_number did not always tell the 
truth.
-
-        :meta private:
-        """
-        return self.try_number
-
-    @property
-    def next_try_number(self) -> int:
-        # todo (dstandish): deprecate this property; we don't need a property 
that is just + 1
-        return self.try_number + 1
-
     @property
     def operator_name(self) -> str | None:
         """@property: use a more friendly display name for the operator, if 
set."""
@@ -2498,40 +2411,6 @@ class TaskInstance(Base, LoggingMixin):
         """
         return _get_previous_ti(task_instance=self, state=state, 
session=session)
 
-    @property
-    def previous_ti(self) -> TaskInstance | TaskInstancePydantic | None:
-        """
-        This attribute is deprecated.
-
-        Please use 
:class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
-        """
-        warnings.warn(
-            """
-            This attribute is deprecated.
-            Please use 
`airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
-            """,
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self.get_previous_ti()
-
-    @property
-    def previous_ti_success(self) -> TaskInstance | TaskInstancePydantic | 
None:
-        """
-        This attribute is deprecated.
-
-        Please use 
:class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
-        """
-        warnings.warn(
-            """
-            This attribute is deprecated.
-            Please use 
`airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
-            """,
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self.get_previous_ti(state=DagRunState.SUCCESS)
-
     @provide_session
     def get_previous_execution_date(
         self,
@@ -2558,23 +2437,6 @@ class TaskInstance(Base, LoggingMixin):
         """
         return _get_previous_start_date(task_instance=self, state=state, 
session=session)
 
-    @property
-    def previous_start_date_success(self) -> pendulum.DateTime | None:
-        """
-        This attribute is deprecated.
-
-        Please use 
:class:`airflow.models.taskinstance.TaskInstance.get_previous_start_date`.
-        """
-        warnings.warn(
-            """
-            This attribute is deprecated.
-            Please use 
`airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
-            """,
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        return self.get_previous_start_date(state=DagRunState.SUCCESS)
-
     @provide_session
     def are_dependencies_met(
         self, dep_context: DepContext | None = None, session: Session = 
NEW_SESSION, verbose: bool = False
@@ -4115,21 +3977,6 @@ class SimpleTaskInstance:
             return self.__dict__ == other.__dict__
         return NotImplemented
 
-    def as_dict(self):
-        warnings.warn(
-            "This method is deprecated. Use BaseSerialization.serialize.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        new_dict = dict(self.__dict__)
-        for key in new_dict:
-            if key in ["start_date", "end_date"]:
-                val = new_dict[key]
-                if not val or isinstance(val, str):
-                    continue
-                new_dict.update({key: val.isoformat()})
-        return new_dict
-
     @classmethod
     def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
         return cls(
@@ -4150,24 +3997,6 @@ class SimpleTaskInstance:
             priority_weight=ti.priority_weight if hasattr(ti, 
"priority_weight") else None,
         )
 
-    @classmethod
-    def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance:
-        warnings.warn(
-            "This method is deprecated. Use BaseSerialization.deserialize.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        ti_key = TaskInstanceKey(*obj_dict.pop("key"))
-        start_date = None
-        end_date = None
-        start_date_str: str | None = obj_dict.pop("start_date")
-        end_date_str: str | None = obj_dict.pop("end_date")
-        if start_date_str:
-            start_date = timezone.parse(start_date_str)
-        if end_date_str:
-            end_date = timezone.parse(end_date_str)
-        return cls(**obj_dict, start_date=start_date, end_date=end_date, 
key=ti_key)
-
 
 class TaskInstanceNote(TaskInstanceDependencies):
     """For storage of arbitrary notes concerning the task instance."""
diff --git a/airflow/providers/amazon/aws/executors/batch/batch_executor.py 
b/airflow/providers/amazon/aws/executors/batch/batch_executor.py
index 92790eb6c2..b38688defb 100644
--- a/airflow/providers/amazon/aws/executors/batch/batch_executor.py
+++ b/airflow/providers/amazon/aws/executors/batch/batch_executor.py
@@ -448,7 +448,7 @@ class AwsBatchExecutor(BaseExecutor):
                         airflow_cmd=ti.command_as_list(),
                         queue=ti.queue,
                         exec_config=ti.executor_config,
-                        attempt_number=ti.prev_attempted_tries,
+                        attempt_number=ti.try_number,
                     )
                     adopted_tis.append(ti)
 
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index eed196e8d8..24379d340a 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -462,7 +462,7 @@ class FileTaskHandler(logging.Handler):
         # try number gets incremented in DB, i.e logs produced the time
         # after cli run and before try_number + 1 in DB will not be displayed.
         if try_number is None:
-            next_try = task_instance.next_try_number
+            next_try = task_instance.try_number + 1
             try_numbers = list(range(1, next_try))
         elif try_number < 1:
             logs = [
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index ad61a13908..c99efc350b 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -74,7 +74,7 @@ class TaskLogReader:
         :param metadata: A dictionary containing information about how to read 
the task log
         """
         if try_number is None:
-            next_try = ti.next_try_number
+            next_try = ti.try_number + 1
             try_numbers = list(range(1, next_try))
         else:
             try_numbers = [try_number]
diff --git a/newsfragments/41784.significant.rst 
b/newsfragments/41784.significant.rst
new file mode 100644
index 0000000000..d0d8a07cd0
--- /dev/null
+++ b/newsfragments/41784.significant.rst
@@ -0,0 +1,12 @@
+Removed a set of deprecations in from ``airflow.models.taskinstance``.
+
+ - Removed deprecated arg ``activate_dag_runs`` from 
``TaskInstance.clear_task_instances()``. Please use ``dag_run_state`` instead.
+ - Removed deprecated arg ``execution_date`` from ``TaskInstance.__init__()``. 
Please use ``run_id`` instead.
+ - Removed deprecated property ``_try_number`` from ``TaskInstance``. Please 
use ``try_number`` instead.
+ - Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``. 
Please use ``try_number`` instead.
+ - Removed deprecated property ``next_try_number`` from ``TaskInstance``. 
Please use ``try_number + 1`` instead.
+ - Removed deprecated property ``previous_ti`` from ``TaskInstance``. Please 
use ``get_previous_ti`` instead.
+ - Removed deprecated property ``previous_ti_success`` from ``TaskInstance``. 
Please use ``get_previous_ti`` instead.
+ - Removed deprecated property ``previous_start_date_success`` from 
``TaskInstance``. Please use ``get_previous_start_date`` instead.
+ - Removed deprecated function ``as_dict`` from ``SimpleTaskInstance``. Please 
use ``BaseSerialization.serialize`` instead.
+ - Removed deprecated function ``from_dict`` from ``SimpleTaskInstance``. 
Please use ``BaseSerialization.deserialize`` instead.
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 378afa0499..84b422342e 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -103,7 +103,7 @@ def create_trigger_in_db(session, trigger, operator=None):
         operator.dag = dag
     else:
         operator = BaseOperator(task_id="test_ti", dag=dag)
-    task_instance = TaskInstance(operator, execution_date=run.execution_date, 
run_id=run.run_id)
+    task_instance = TaskInstance(operator, run_id=run.run_id)
     task_instance.trigger_id = trigger_orm.id
     session.add(dag_model)
     session.add(run)
diff --git a/tests/models/test_baseoperator.py 
b/tests/models/test_baseoperator.py
index d645b221a5..2aa5b76b22 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -1094,11 +1094,11 @@ def test_get_task_instances(session):
         "run_type": DagRunType.MANUAL,
     }
     dr1 = DagRun(execution_date=first_execution_date, run_id="test_run_id_1", 
**common_dr_kwargs)
-    ti_1 = TaskInstance(run_id=dr1.run_id, task=task, 
execution_date=first_execution_date)
+    ti_1 = TaskInstance(run_id=dr1.run_id, task=task)
     dr2 = DagRun(execution_date=second_execution_date, run_id="test_run_id_2", 
**common_dr_kwargs)
-    ti_2 = TaskInstance(run_id=dr2.run_id, task=task, 
execution_date=second_execution_date)
+    ti_2 = TaskInstance(run_id=dr2.run_id, task=task)
     dr3 = DagRun(execution_date=third_execution_date, run_id="test_run_id_3", 
**common_dr_kwargs)
-    ti_3 = TaskInstance(run_id=dr3.run_id, task=task, 
execution_date=third_execution_date)
+    ti_3 = TaskInstance(run_id=dr3.run_id, task=task)
     session.add_all([dr1, dr2, dr3, ti_1, ti_2, ti_3])
     session.commit()
 
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index fd0647577a..2c167a542d 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -1422,7 +1422,7 @@ def test_mapped_literal_faulty_state_in_db(dag_maker, 
session):
     assert len(decision.schedulable_tis) == 2
 
     # We insert a faulty record
-    session.add(TaskInstance(dag.get_task("task_2"), dr.execution_date, 
dr.run_id))
+    session.add(TaskInstance(task=dag.get_task("task_2"), run_id=dr.run_id))
     session.flush()
 
     decision = dr.task_instance_scheduling_decisions()
diff --git a/tests/providers/microsoft/conftest.py 
b/tests/providers/microsoft/conftest.py
index b18a2cb1fe..c77dd7747d 100644
--- a/tests/providers/microsoft/conftest.py
+++ b/tests/providers/microsoft/conftest.py
@@ -124,14 +124,11 @@ def mock_context(task) -> Context:
         def __init__(
             self,
             task,
-            execution_date: datetime | None = None,
             run_id: str | None = "run_id",
             state: str | None = TaskInstanceState.RUNNING,
             map_index: int = -1,
         ):
-            super().__init__(
-                task=task, execution_date=execution_date, run_id=run_id, 
state=state, map_index=map_index
-            )
+            super().__init__(task=task, run_id=run_id, state=state, 
map_index=map_index)
             self.values: dict[str, Any] = {}
 
         def xcom_pull(
diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py 
b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
index 1deff072bd..b52a6f8e93 100644
--- a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
+++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
@@ -34,7 +34,7 @@ class TestDagTISlotsAvailableDep:
         """
         dag = Mock(concurrency=1, 
get_concurrency_reached=Mock(return_value=True))
         task = Mock(dag=dag, pool_slots=1)
-        ti = TaskInstance(task, execution_date=None)
+        ti = TaskInstance(task)
 
         assert not DagTISlotsAvailableDep().is_met(ti=ti)
 
@@ -44,6 +44,6 @@ class TestDagTISlotsAvailableDep:
         """
         dag = Mock(concurrency=1, 
get_concurrency_reached=Mock(return_value=False))
         task = Mock(dag=dag, pool_slots=1)
-        ti = TaskInstance(task, execution_date=None)
+        ti = TaskInstance(task)
 
         assert DagTISlotsAvailableDep().is_met(ti=ti)
diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py 
b/tests/ti_deps/deps/test_dag_unpaused_dep.py
index e3f740a54c..576c4277eb 100644
--- a/tests/ti_deps/deps/test_dag_unpaused_dep.py
+++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py
@@ -34,7 +34,7 @@ class TestDagUnpausedDep:
         """
         dag = Mock(**{"get_is_paused.return_value": True})
         task = Mock(dag=dag)
-        ti = TaskInstance(task=task, execution_date=None)
+        ti = TaskInstance(task=task)
 
         assert not DagUnpausedDep().is_met(ti=ti)
 
@@ -44,6 +44,6 @@ class TestDagUnpausedDep:
         """
         dag = Mock(**{"get_is_paused.return_value": False})
         task = Mock(dag=dag)
-        ti = TaskInstance(task=task, execution_date=None)
+        ti = TaskInstance(task=task)
 
         assert DagUnpausedDep().is_met(ti=ti)
diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py 
b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
index 17736abbf7..1b0de7c991 100644
--- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py
+++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
@@ -34,7 +34,7 @@ pytestmark = [pytest.mark.db_test, 
pytest.mark.skip_if_database_isolation_mode]
 class TestNotInRetryPeriodDep:
     def _get_task_instance(self, state, end_date=None, 
retry_delay=timedelta(minutes=15)):
         task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False)
-        ti = TaskInstance(task=task, state=state, execution_date=None)
+        ti = TaskInstance(task=task, state=state)
         ti.end_date = end_date
         return ti
 

Reply via email to