This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bb8d465f221 Clean Leftovers of RemovedInAirflow3Warning (#47264)
bb8d465f221 is described below
commit bb8d465f221864e4fd84ee5ed5b0bbb524c95d50
Author: Jens Scheffler <[email protected]>
AuthorDate: Wed Mar 5 15:22:19 2025 +0100
Clean Leftovers of RemovedInAirflow3Warning (#47264)
* Clean Leftovers of RemovedInAirflow3Warning
* Add newsfragment for deprecations
* Revert one cleanup in decorator
* Replace further leftovers of airflow.api.auth.backend.session in code
* Review feedback
---
Dockerfile.ci | 2 +-
airflow/api/auth/backend/session.py | 38 ----------------
airflow/configuration.py | 9 ++--
airflow/exceptions.py | 7 ---
airflow/executors/base_executor.py | 20 ---------
airflow/jobs/scheduler_job_runner.py | 32 +-------------
clients/python/README.md | 4 +-
clients/python/test_python_client.py | 2 +-
contributing-docs/testing/unit_tests.rst | 1 -
dev/README_RELEASE_PYTHON_CLIENT.md | 2 +-
.../core-concepts/executor/index.rst | 1 -
.../howto/docker-compose/docker-compose.yaml | 2 +-
newsfragments/47264.significant.rst | 16 +++++++
providers/edge/docs/edge_executor.rst | 1 -
.../fab/docs/auth-manager/api-authentication.rst | 2 +-
pyproject.toml | 1 -
scripts/ci/pre_commit/check_deprecations.py | 1 -
scripts/ci/testing/summarize_captured_warnings.py | 1 -
scripts/docker/entrypoint_ci.sh | 2 +-
tests/always/test_example_dags.py | 8 ----
tests/core/test_configuration.py | 2 +-
tests/jobs/test_scheduler_job.py | 51 ----------------------
tests/serialization/test_dag_serialization.py | 11 +----
23 files changed, 30 insertions(+), 186 deletions(-)
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 530cabe466e..bb778684130 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -1021,7 +1021,7 @@ function start_api_server_with_examples(){
return
fi
export AIRFLOW__CORE__LOAD_EXAMPLES=True
- export
AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
+ export
AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
echo
echo "${COLOR_BLUE}Initializing database${COLOR_RESET}"
diff --git a/airflow/api/auth/backend/session.py
b/airflow/api/auth/backend/session.py
deleted file mode 100644
index e6b4da69121..00000000000
--- a/airflow/api/auth/backend/session.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Session authentication backend."""
-
-from __future__ import annotations
-
-import warnings
-from typing import Any
-
-import airflow.providers.fab.auth_manager.api.auth.backend.session as
fab_session
-from airflow.exceptions import RemovedInAirflow3Warning
-
-CLIENT_AUTH: tuple[str, str] | Any | None = None
-
-
-warnings.warn(
- "This module is deprecated. Please use
`airflow.providers.fab.auth_manager.api.auth.backend.session` instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
-)
-
-
-init_app = fab_session.init_app
-requires_authentication = fab_session.requires_authentication
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 969b29d9405..1fdcf852771 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -378,7 +378,7 @@ class AirflowConfigParser(ConfigParser):
"api": {
"auth_backends": (
re2.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
- "airflow.api.auth.backend.session",
+ "airflow.providers.fab.auth_manager.api.auth.backend.session",
"3.0",
),
},
@@ -686,10 +686,7 @@ class AirflowConfigParser(ConfigParser):
This is required by the UI for ajax queries.
"""
old_value = self.get("api", "auth_backends", fallback="")
- if (
- old_value.find("airflow.api.auth.backend.session") == -1
- and
old_value.find("airflow.providers.fab.auth_manager.api.auth.backend.session")
== -1
- ):
+ if "airflow.providers.fab.auth_manager.api.auth.backend.session" not
in old_value:
new_value = old_value +
",airflow.providers.fab.auth_manager.api.auth.backend.session"
self._update_env_var(section="api", name="auth_backends",
new_value=new_value)
self.upgraded_values[("api", "auth_backends")] = old_value
@@ -700,7 +697,7 @@ class AirflowConfigParser(ConfigParser):
os.environ.pop(old_env_var, None)
warnings.warn(
- "The auth_backends setting in [api] has had
airflow.api.auth.backend.session added "
+ "The auth_backends setting in [api] missed
airflow.providers.fab.auth_manager.api.auth.backend.session "
"in the running config, which is needed by the UI. Please
update your config before "
"Apache Airflow 3.0.",
FutureWarning,
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index c00d2aea535..ca0eb6509eb 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -475,13 +475,6 @@ except ImportError:
"""Raised when an error is encountered while trying to merge pod
configs."""
-class RemovedInAirflow3Warning(DeprecationWarning):
- """Issued for usage of deprecated features that will be removed in
Airflow3."""
-
- deprecated_since: str | None = None
- "Indicates the airflow version that started raising this deprecation
warning"
-
-
class RemovedInAirflow4Warning(DeprecationWarning):
"""Issued for usage of deprecated features that will be removed in
Airflow4."""
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 765623d8c94..f1b6b936f9f 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -26,11 +26,9 @@ from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Optional
import pendulum
-from deprecated import deprecated
from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
-from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.stats import Stats
@@ -587,24 +585,6 @@ class BaseExecutor(LoggingMixin):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError
- @deprecated(
- reason="Replaced by function `revoke_task`.",
- category=RemovedInAirflow3Warning,
- action="ignore",
- )
- def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
- """
- Handle remnants of tasks that were failed because they were stuck in
queued.
-
- Tasks can get stuck in queued. If such a task is detected, it will be
marked
- as `UP_FOR_RETRY` if the task instance has remaining retries or marked
as `FAILED`
- if it doesn't.
-
- :param tis: List of Task Instances to clean up
- :return: List of readable task instances for a warning message
- """
- raise NotImplementedError
-
def revoke_task(self, *, ti: TaskInstance):
"""
Attempt to remove task from executor.
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 6cb90e46540..e83e3808aa5 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -26,13 +26,12 @@ import sys
import time
from collections import Counter, defaultdict, deque
from collections.abc import Collection, Iterable, Iterator
-from contextlib import ExitStack, suppress
+from contextlib import ExitStack
from datetime import date, timedelta
from functools import lru_cache, partial
from itertools import groupby
from typing import TYPE_CHECKING, Any, Callable
-from deprecated import deprecated
from sqlalchemy import and_, delete, exists, func, select, text, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient,
selectinload
@@ -42,7 +41,6 @@ from airflow import settings
from airflow.callbacks.callback_requests import DagCallbackRequest,
TaskCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
-from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_loader import ExecutorLoader
@@ -1727,12 +1725,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
session.commit()
except NotImplementedError:
- # this block only gets entered if the executor has not
implemented `revoke_task`.
- # in which case, we try the fallback logic
- # todo: remove the call to _stuck_in_queued_backcompat_logic
in airflow 3.0.
- # after 3.0, `cleanup_stuck_queued_tasks` will be removed,
so we should
- # just continue immediately.
- self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
continue
def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
@@ -1779,28 +1771,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
ti.set_state(TaskInstanceState.FAILED, session=session)
- @deprecated(
- reason="This is backcompat layer for older executor interface. Should
be removed in 3.0",
- category=RemovedInAirflow3Warning,
- action="ignore",
- )
- def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
- """
- Try to invoke stuck in queued cleanup for older executor interface.
-
- TODO: remove in airflow 3.0
-
- Here we handle case where the executor pre-dates the interface change
that
- introduced `cleanup_tasks_stuck_in_queued` and deprecated
`cleanup_stuck_queued_tasks`.
-
- """
- with suppress(NotImplementedError):
- for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
- self.log.warning(
- "Task instance %s stuck in queued. Will be set to failed.",
- ti_repr,
- )
-
def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
session.execute(
update(TI)
diff --git a/clients/python/README.md b/clients/python/README.md
index 7bba821e43e..07ae5c81b3a 100644
--- a/clients/python/README.md
+++ b/clients/python/README.md
@@ -534,11 +534,11 @@ that uses the API to run the tests. To do that, you need
to:
```ini
[api]
-auth_backend =
airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
+auth_backend =
airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
```
You can also set it by env variable:
-`export
AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth`
+`export
AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth`
* configure your airflow webserver to load example dags
In the `[core]` section of your `airflow.cfg` set:
diff --git a/clients/python/test_python_client.py
b/clients/python/test_python_client.py
index 7b103210b74..80759cef3ca 100644
--- a/clients/python/test_python_client.py
+++ b/clients/python/test_python_client.py
@@ -51,7 +51,7 @@ from airflow_client.client.model.dag_run import DAGRun
# configured also with the basic_auth as backend additionally to regular
session backend needed
# by the UI. In the `[api]` section of your `airflow.cfg` set:
#
-# auth_backend =
airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
+# auth_backend =
airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
#
# Make sure that your user/name are configured properly - using the
user/password that has admin
# privileges in Airflow
diff --git a/contributing-docs/testing/unit_tests.rst
b/contributing-docs/testing/unit_tests.rst
index 442d22ed78d..1203e2b29c2 100644
--- a/contributing-docs/testing/unit_tests.rst
+++ b/contributing-docs/testing/unit_tests.rst
@@ -42,7 +42,6 @@ Handling warnings
By default, in the new tests selected warnings are prohibited:
* ``airflow.exceptions.AirflowProviderDeprecationWarning``
-* ``airflow.exceptions.RemovedInAirflow3Warning``
That mean if one of this warning appear during test run and do not captured
the test will failed.
diff --git a/dev/README_RELEASE_PYTHON_CLIENT.md
b/dev/README_RELEASE_PYTHON_CLIENT.md
index a3e743420ab..77b2266affa 100644
--- a/dev/README_RELEASE_PYTHON_CLIENT.md
+++ b/dev/README_RELEASE_PYTHON_CLIENT.md
@@ -477,7 +477,7 @@ and allows you to test the client in a real environment.
variable in `files/airflow-breeze-config/init.sh`:
```shell
-export
AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
+export
AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
```
diff --git a/docs/apache-airflow/core-concepts/executor/index.rst
b/docs/apache-airflow/core-concepts/executor/index.rst
index cb6384d129c..93737af5dce 100644
--- a/docs/apache-airflow/core-concepts/executor/index.rst
+++ b/docs/apache-airflow/core-concepts/executor/index.rst
@@ -260,7 +260,6 @@ The following methods aren't required to override to have a
functional Airflow e
* ``start``: The Airflow scheduler job will call this method after it
initializes the executor object. Any additional setup required by the executor
can be completed here.
* ``end``: The Airflow scheduler job will call this method as it is tearing
down. Any synchronous cleanup required to finish running jobs should be done
here.
* ``terminate``: More forcefully stop the executor, even killing/stopping
in-flight tasks instead of synchronously waiting for completion.
-* ``cleanup_stuck_queued_tasks``: If tasks are stuck in the queued state for
longer than ``task_queued_timeout`` then they are collected by the scheduler
and provided to the executor to have an opportunity to handle them (perform any
graceful cleanup/teardown) via this method and return the Task Instances for a
warning message displayed to users.
* ``try_adopt_task_instances``: Tasks that have been abandoned (e.g. from a
scheduler job that died) are provided to the executor to adopt or otherwise
handle them via this method. Any tasks that cannot be adopted (by default the
BaseExecutor assumes all cannot be adopted) should be returned.
* ``get_cli_commands``: Executors may vend CLI commands to users by
implementing this method, see the `CLI`_ section below for more details.
* ``get_task_log``: Executors may vend log messages to Airflow task logs by
implementing this method, see the `Logging`_ section below for more details.
diff --git a/docs/apache-airflow/howto/docker-compose/docker-compose.yaml
b/docs/apache-airflow/howto/docker-compose/docker-compose.yaml
index ec3e4f86202..d1ee96b72b7 100644
--- a/docs/apache-airflow/howto/docker-compose/docker-compose.yaml
+++ b/docs/apache-airflow/howto/docker-compose/docker-compose.yaml
@@ -61,7 +61,7 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: >-
-
airflow.providers.fab.auth_manager.api.auth.backend.basic_auth,airflow.api.auth.backend.session
+
airflow.providers.fab.auth_manager.api.auth.backend.basic_auth,airflow.providers.fab.auth_manager.api.auth.backend.session
AIRFLOW__WORKERS__EXECUTION_API_SERVER_URL:
'http://airflow-apiserver:8080/execution/'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
diff --git a/newsfragments/47264.significant.rst
b/newsfragments/47264.significant.rst
new file mode 100644
index 00000000000..d66c18dd126
--- /dev/null
+++ b/newsfragments/47264.significant.rst
@@ -0,0 +1,16 @@
+Removed leftover deprecations prior to 3.0.0.
+
+* Removed the ``RemovedInAirflow3Warning`` warning class.
+* Removed the deprecated module ``airflow.api.auth.backend.session``. Please
use ``airflow.providers.fab.auth_manager.api.auth.backend.session`` instead.
+* Removed the deprecated ``cleanup_stuck_queued_tasks`` method from the
``BaseExecutor`` interface. It is replaced by function ``revoke_task``.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [ ] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [x] Code interface changes
diff --git a/providers/edge/docs/edge_executor.rst
b/providers/edge/docs/edge_executor.rst
index 766f48a37bf..ca044bd14b6 100644
--- a/providers/edge/docs/edge_executor.rst
+++ b/providers/edge/docs/edge_executor.rst
@@ -134,7 +134,6 @@ before use. The following features have been initially
tested and are working:
optimized for scalability. This will need to be considered in future
releases. A dedicated performance
assessment is to be completed ensuring that in a hybrid setup other
executors are not impacted before
version 1.0.0 is to be released.
- - Stuck tasks in queue are not explicitly handled as
``cleanup_stuck_queued_tasks()`` is not implemented.
Architecture
diff --git a/providers/fab/docs/auth-manager/api-authentication.rst
b/providers/fab/docs/auth-manager/api-authentication.rst
index cfebe3f8cfe..8307d4d5f18 100644
--- a/providers/fab/docs/auth-manager/api-authentication.rst
+++ b/providers/fab/docs/auth-manager/api-authentication.rst
@@ -24,7 +24,7 @@ check the user session:
.. code-block:: ini
[api]
- auth_backends = airflow.api.auth.backend.session
+ auth_backends = airflow.providers.fab.auth_manager.api.auth.backend.session
.. versionchanged:: 1.10.11
diff --git a/pyproject.toml b/pyproject.toml
index a55329ed8ad..b97741ab186 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -543,7 +543,6 @@ filterwarnings = [
# Instead of that, we use a separate parameter and dynamically add it into
`filterwarnings` marker.
# Add airflow.exceptions.RemovedInAirflow4Warning when min provider version
for providers is 3.0
forbidden_warnings = [
- "airflow.exceptions.RemovedInAirflow3Warning",
"airflow.exceptions.AirflowProviderDeprecationWarning",
]
python_files = [
diff --git a/scripts/ci/pre_commit/check_deprecations.py
b/scripts/ci/pre_commit/check_deprecations.py
index 1920143fbc2..13717c9a91f 100755
--- a/scripts/ci/pre_commit/check_deprecations.py
+++ b/scripts/ci/pre_commit/check_deprecations.py
@@ -32,7 +32,6 @@ console = Console(color_system="standard", width=200)
allowed_warnings: dict[str, tuple[str, ...]] = {
- "airflow": ("airflow.exceptions.RemovedInAirflow3Warning",),
"providers": ("airflow.exceptions.AirflowProviderDeprecationWarning",),
}
compatible_decorators: frozenset[tuple[str, ...]] = frozenset(
diff --git a/scripts/ci/testing/summarize_captured_warnings.py
b/scripts/ci/testing/summarize_captured_warnings.py
index 588bd91c0d9..1286359c3b8 100755
--- a/scripts/ci/testing/summarize_captured_warnings.py
+++ b/scripts/ci/testing/summarize_captured_warnings.py
@@ -48,7 +48,6 @@ IMPORTANT_WARNING_SIGN = {
"pydantic.warnings.PydanticDeprecatedSince20": "!!",
"celery.exceptions.CPendingDeprecationWarning": "!!",
"pytest.PytestWarning": "!!",
- "airflow.exceptions.RemovedInAirflow3Warning": "!",
"airflow.exceptions.AirflowProviderDeprecationWarning": "!",
}
# Always print messages for these warning categories
diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh
index c90f1d2e4f2..626bfa4a66d 100755
--- a/scripts/docker/entrypoint_ci.sh
+++ b/scripts/docker/entrypoint_ci.sh
@@ -346,7 +346,7 @@ function start_api_server_with_examples(){
return
fi
export AIRFLOW__CORE__LOAD_EXAMPLES=True
- export
AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
+ export
AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
echo
echo "${COLOR_BLUE}Initializing database${COLOR_RESET}"
diff --git a/tests/always/test_example_dags.py
b/tests/always/test_example_dags.py
index c899d6230ba..043d853aec3 100644
--- a/tests/always/test_example_dags.py
+++ b/tests/always/test_example_dags.py
@@ -171,14 +171,6 @@ def example_not_excluded_dags(xfail_db_exception: bool =
False):
f"Skipping {candidate} because providers are not
included for {default_branch} branch."
)
continue
- # Do not raise an error for
airflow.exceptions.RemovedInAirflow3Warning.
- # We should not rush to enforce new syntax updates in providers
- # because a version of Airflow that deprecates certain
features may not yet be released.
- # Instead, it is advisable to periodically review the warning
reports and implement manual
- # updates as needed.
- param_marks.append(
-
pytest.mark.filterwarnings("default::airflow.exceptions.RemovedInAirflow3Warning")
- )
if
candidate.endswith(IGNORE_AIRFLOW_PROVIDER_DEPRECATION_WARNING):
param_marks.append(
pytest.mark.filterwarnings(
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index c205fce9546..e8489e556f7 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -623,7 +623,7 @@ key3 = value3
"api": {
"auth_backends": (
re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
- "airflow.api.auth.backend.session",
+
"airflow.providers.fab.auth_manager.api.auth.backend.session",
"3.0",
),
},
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b4ebc4761e3..e3f32f89eb8 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2110,57 +2110,6 @@ class TestSchedulerJob:
# Second executor called for ti3
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
- def test_handle_stuck_queued_tasks_backcompat(self, dag_maker, session,
mock_executors):
- """
- Verify backward compatibility of the executor interface w.r.t. stuck
queued.
-
- Prior to #43520, scheduler called method `cleanup_stuck_queued_tasks`,
which failed tis.
-
- After #43520, scheduler calls `cleanup_tasks_stuck_in_queued`, which
requeues tis.
-
- At Airflow 3.0, we should remove backcompat support for this old
function. But for now
- we verify that we call it as a fallback.
- """
- # todo: remove in airflow 3.0
- with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
- op1 = EmptyOperator(task_id="op1")
- op2 = EmptyOperator(task_id="op2", executor="default_exec")
- op3 = EmptyOperator(task_id="op3", executor="secondary_exec")
-
- dr = dag_maker.create_dagrun()
- ti1 = dr.get_task_instance(task_id=op1.task_id, session=session)
- ti2 = dr.get_task_instance(task_id=op2.task_id, session=session)
- ti3 = dr.get_task_instance(task_id=op3.task_id, session=session)
- for ti in [ti1, ti2, ti3]:
- ti.state = State.QUEUED
- ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
- session.commit()
- scheduler_job = Job()
- job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
- job_runner._task_queued_timeout = 300
- mock_exec_1 = mock_executors[0]
- mock_exec_2 = mock_executors[1]
- mock_exec_1.revoke_task.side_effect = NotImplementedError
- mock_exec_2.revoke_task.side_effect = NotImplementedError
-
- with
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as
loader_mock:
- # The executors are mocked, so cannot be loaded/imported. Mock
load_executor and return the
- # correct object for the given input executor name.
- loader_mock.side_effect = lambda *x: {
- ("default_exec",): mock_exec_1,
- (None,): mock_exec_1,
- ("secondary_exec",): mock_exec_2,
- }[x]
- job_runner._handle_tasks_stuck_in_queued()
-
- # Default executor is called for ti1 (no explicit executor override
uses default) and ti2 (where we
- # explicitly marked that for execution by the default executor)
- try:
-
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, ti2])
- except AssertionError:
-
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
-
mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
-
@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker,
session, mock_executors):
"""Verify that tasks stuck in queued will be rescheduled up to N
times."""
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index da8b440423c..d9c27d8eb08 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -53,7 +53,6 @@ from airflow.decorators.base import DecoratedOperator
from airflow.exceptions import (
AirflowException,
ParamValidationError,
- RemovedInAirflow3Warning,
SerializationError,
)
from airflow.hooks.base import BaseHook
@@ -980,15 +979,7 @@ class TestStringifiedDAGs:
assert "params" in serialized_dag["dag"]
- if val and any([True for k, v in val.items() if isinstance(v, set)]):
- with pytest.warns(
- RemovedInAirflow3Warning,
- match="The use of non-json-serializable params is deprecated
and will be removed in a future release",
- ):
- deserialized_dag = SerializedDAG.from_dict(serialized_dag)
-
- else:
- deserialized_dag = SerializedDAG.from_dict(serialized_dag)
+ deserialized_dag = SerializedDAG.from_dict(serialized_dag)
deserialized_simple_task = deserialized_dag.task_dict["simple_task"]
assert expected_val == deserialized_dag.params.dump()
assert expected_val == deserialized_simple_task.params.dump()