This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fffab39f996 Drop Airflow 2 Support in Edge Provider (#59143)
fffab39f996 is described below

commit fffab39f99607e3c54842342fdc982babd403707
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Dec 7 23:53:40 2025 +0100

    Drop Airflow 2 Support in Edge Provider (#59143)
    
    * Drop Airflow 2 Support in Edge Provider
    
    * Fix pytests
    
    * Add note about support in changelog
---
 .pre-commit-config.yaml                            |   3 +-
 .rat-excludes                                      |   2 +-
 dev/breeze/src/airflow_breeze/global_constants.py  |   2 +-
 go-sdk/pkg/edgeapi/client.go                       |   2 +-
 providers/edge3/.pre-commit-config.yaml            |   2 +-
 providers/edge3/README.rst                         |   2 +-
 providers/edge3/docs/architecture.rst              |   4 +-
 providers/edge3/docs/changelog.rst                 |   8 +
 providers/edge3/docs/deployment.rst                |   4 +-
 providers/edge3/docs/index.rst                     |   4 +-
 providers/edge3/docs/ui_plugin.rst                 |   6 +-
 providers/edge3/pyproject.toml                     |   2 +-
 .../edge3/src/airflow/providers/edge3/__init__.py  |   4 +-
 .../src/airflow/providers/edge3/cli/api_client.py  |  35 +-
 .../src/airflow/providers/edge3/cli/worker.py      |  28 +-
 .../providers/edge3/executors/edge_executor.py     |  67 +-
 .../airflow/providers/edge3/openapi/__init__.py    |  19 -
 .../edge3/openapi/edge_worker_api_v1.yaml          | 808 ---------------------
 .../edge3/plugins/edge_executor_plugin.py          | 231 +-----
 .../providers/edge3/plugins/www/package.json       |   2 +-
 .../src/airflow/providers/edge3/version_compat.py  |   2 -
 .../src/airflow/providers/edge3/worker_api/auth.py |  46 +-
 .../providers/edge3/worker_api/datamodels.py       |   5 +-
 .../edge3/worker_api/routes/_v2_compat.py          | 144 ----
 .../edge3/worker_api/routes/_v2_routes.py          | 237 ------
 .../providers/edge3/worker_api/routes/health.py    |   2 +-
 .../providers/edge3/worker_api/routes/jobs.py      |  18 +-
 .../providers/edge3/worker_api/routes/logs.py      |  13 +-
 .../providers/edge3/worker_api/routes/worker.py    |  14 +-
 .../{openapi => worker_api}/v2-edge-generated.yaml |   3 +-
 .../edge3/tests/unit/edge3/cli/test_worker.py      |  68 +-
 .../unit/edge3/executors/test_edge_executor.py     | 174 +----
 .../edge3/plugins/test_edge_executor_plugin.py     |  48 +-
 .../unit/edge3/worker_api/routes/test_worker.py    |   2 +-
 providers/edge3/www-hash.txt                       |   2 +-
 .../run_generate_openapi_spec_providers.py         |   2 +-
 36 files changed, 138 insertions(+), 1877 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index d25d26bce0d..ebb6b15b31b 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -268,7 +268,8 @@ repos:
           ^.*openapi.*\.yaml$|
           ^\.pre-commit-config\.yaml$|
           ^.*reproducible_build\.yaml$|
-          ^.*pnpm-lock\.yaml$
+          ^.*pnpm-lock\.yaml$|
+          ^.*-generated\.yaml$
   - repo: https://github.com/ikamensh/flynt
     rev: 97be693bf18bc2f050667dd282d243e2824b81e2  # frozen: 1.0.6
     hooks:
diff --git a/.rat-excludes b/.rat-excludes
index 3068aca8872..47438f0fe90 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -313,7 +313,7 @@ www-hash.txt
 *generated.*
 
/src/airflow/providers/keycloak/auth_manager/openapi/v2-keycloak-auth-manager-generated.yaml
 /src/airflow/providers/edge3/plugins/www/*
-/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
+/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
 
/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/v2-fab-auth-manager-generated.yaml
 /src/airflow/providers/fab/www/static/dist/*
 
/any/dag_id=dag_for_testing_redis_task_handler/run_id=test/task_id=task_for_testing_redis_log_handler/attempt=1.log
diff --git a/dev/breeze/src/airflow_breeze/global_constants.py 
b/dev/breeze/src/airflow_breeze/global_constants.py
index 85651f14ee8..7cef7d4bf07 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -736,7 +736,7 @@ PROVIDERS_COMPATIBILITY_TESTS_MATRIX: list[dict[str, str | 
list[str]]] = [
     {
         "python-version": "3.10",
         "airflow-version": "2.11.0",
-        "remove-providers": "common.messaging fab git keycloak",
+        "remove-providers": "common.messaging edge3 fab git keycloak",
         "run-unit-tests": "true",
     },
     {
diff --git a/go-sdk/pkg/edgeapi/client.go b/go-sdk/pkg/edgeapi/client.go
index d4d888ffbbf..ef81645b287 100644
--- a/go-sdk/pkg/edgeapi/client.go
+++ b/go-sdk/pkg/edgeapi/client.go
@@ -27,7 +27,7 @@ import (
 
 //go:generate -command openapi-gen go run 
github.com/ashb/oapi-resty-codegen@latest --config oapi-codegen.yml
 
-//go:generate openapi-gen 
https://raw.githubusercontent.com/apache/airflow/refs/tags/providers-edge3/1.3.0/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
+//go:generate openapi-gen 
https://raw.githubusercontent.com/apache/airflow/refs/tags/providers-edge3/1.3.0/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
 
 func WithEdgeAPIJWTKey(key []byte) ClientOption {
        return func(c *Client) error {
diff --git a/providers/edge3/.pre-commit-config.yaml 
b/providers/edge3/.pre-commit-config.yaml
index 230ead00cf3..47b0c6ec6f7 100644
--- a/providers/edge3/.pre-commit-config.yaml
+++ b/providers/edge3/.pre-commit-config.yaml
@@ -37,7 +37,7 @@ repos:
         files: |
           (?x)
           
^src/airflow/providers/edge3/plugins/www/.*\.(js|ts|tsx|yaml|css|json)$|
-          ^src/airflow/providers/edge3/openapi/v2-edge-generated.yaml$
+          ^src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml$
         exclude: |
           (?x)
           ^src/airflow/providers/edge3/plugins/www/node-modules/.*|
diff --git a/providers/edge3/README.rst b/providers/edge3/README.rst
index 2aa03b15bff..c288aef4338 100644
--- a/providers/edge3/README.rst
+++ b/providers/edge3/README.rst
@@ -65,7 +65,7 @@ Requirements
 ==========================================  ====================
 PIP package                                 Version required
 ==========================================  ====================
-``apache-airflow``                          ``>=2.11.0,!=3.1.0``
+``apache-airflow``                          ``>=3.0.0,!=3.1.0``
 ``apache-airflow-providers-common-compat``  ``>=1.10.0``
 ``pydantic``                                ``>=2.11.0``
 ``retryhttp``                               ``>=1.2.0,!=1.3.0``
diff --git a/providers/edge3/docs/architecture.rst 
b/providers/edge3/docs/architecture.rst
index 5c38b1ab95d..5a1f7bb070a 100644
--- a/providers/edge3/docs/architecture.rst
+++ b/providers/edge3/docs/architecture.rst
@@ -63,7 +63,7 @@ deployed outside of the central Airflow cluster is connected 
via HTTP(s) to the
 * **Workers** (Central) - Execute the assigned tasks - most standard setup has 
local or centralized workers, e.g. via Celery
 * **Edge Workers** - Special workers which pull tasks via HTTP(s) as provided 
as feature via this provider package
 * **Scheduler** - Responsible for adding the necessary tasks to the queue. The 
EdgeExecutor is running as a module inside the scheduler.
-* **API server** (webserver in Airflow 2.x) - HTTP REST API Server provides 
access to Dag/task status information. The required end-points are
+* **API server** - HTTP REST API Server provides access to Dag/task status 
information. The required end-points are
   provided by the Edge provider plugin. The Edge Worker uses this API to pull 
tasks and send back the results.
 * **Database** - Contains information about the status of tasks, Dags, 
Variables, connections, etc.
 
@@ -77,7 +77,7 @@ In detail the parts of the Edge provider are deployed as 
follows:
   need to set the ``executor`` configuration option in the ``airflow.cfg`` 
file to
   ``airflow.providers.edge3.executors.EdgeExecutor``. For more details see 
:doc:`edge_executor`. Note that also
   multiple executors can be used in parallel together with the EdgeExecutor.
-* **API server** (webserver in Airflow 2.x) - The API server is providing REST 
endpoints to the web UI as well
+* **API server** - The API server is providing REST endpoints to the web UI as 
well
   as serves static files. The Edge provider adds a plugin that provides 
additional REST API for the Edge Worker
   as well as UI elements to manage workers (not available in Airflow 3.0).
   The API server is responsible for handling requests from the Edge Worker and 
sending back the results. To
diff --git a/providers/edge3/docs/changelog.rst 
b/providers/edge3/docs/changelog.rst
index 24c3883836f..00b86c138bf 100644
--- a/providers/edge3/docs/changelog.rst
+++ b/providers/edge3/docs/changelog.rst
@@ -27,6 +27,14 @@
 Changelog
 ---------
 
+.. warning::
+    This release of the Edge3 provider drops support for Airflow versions 
below 3.0.0.
+
+    The support for Airflow 2.10-2.11 was experimental and GA for the provider 
is only for Airflow 3.0+.
+    Productive operation was not intended in Airflow 2.x, therefore the 
support for Airflow 2.x is now dropped
+    earlier than the usual release support policy would indicate.
+
+
 1.6.0
 .....
 
diff --git a/providers/edge3/docs/deployment.rst 
b/providers/edge3/docs/deployment.rst
index d0387c41946..ff8a93def13 100644
--- a/providers/edge3/docs/deployment.rst
+++ b/providers/edge3/docs/deployment.rst
@@ -113,7 +113,7 @@ process as and wait until all running tasks are completed. 
Also in a console you
 If you want to monitor the remote activity and worker, use the UI plugin which
 is included in the provider package and install it on the api-server / 
webserver and use the
 "Admin" - "Edge Worker Hosts" and "Edge Worker Jobs" pages.
-(Note: The plugin is not available on Airflow 3.0 UI, it is only in 2.x and 
3.1++)
+(Note: The plugin is not available on Airflow 3.0 UI, it is only in 3.1++)
 
 If you want to check status of the worker via CLI you can use the command
 
@@ -150,7 +150,7 @@ Worker status can be checked via the web UI in the "Admin" 
- "Edge Worker" page.
 
 .. note::
 
-    Airflow 3.0 does not support UI plugins. The UI plugin is only available 
in Airflow 2.10 and in 3.1 and newer.
+    Airflow 3.0 does not support UI plugins. The UI plugin is only available 
in Airflow 3.1 and newer.
     Alternatively you can use the CLI commands as described in 
:ref:`deployment:maintenance-mgmt-cli`.
 
 
diff --git a/providers/edge3/docs/index.rst b/providers/edge3/docs/index.rst
index 67f4b497cc4..94718902f04 100644
--- a/providers/edge3/docs/index.rst
+++ b/providers/edge3/docs/index.rst
@@ -108,12 +108,12 @@ For the minimum Airflow version supported, see 
``Requirements`` below.
 Requirements
 ------------
 
-The minimum Apache Airflow version supported by this provider distribution is 
``2.11.0``.
+The minimum Apache Airflow version supported by this provider distribution is 
``3.0.0``.
 
 ==========================================  ====================
 PIP package                                 Version required
 ==========================================  ====================
-``apache-airflow``                          ``>=2.11.0,!=3.1.0``
+``apache-airflow``                          ``>=3.0.0,!=3.1.0``
 ``apache-airflow-providers-common-compat``  ``>=1.10.0``
 ``pydantic``                                ``>=2.11.0``
 ``retryhttp``                               ``>=1.2.0,!=1.3.0``
diff --git a/providers/edge3/docs/ui_plugin.rst 
b/providers/edge3/docs/ui_plugin.rst
index 043892e1ffc..bbf621e4599 100644
--- a/providers/edge3/docs/ui_plugin.rst
+++ b/providers/edge3/docs/ui_plugin.rst
@@ -22,7 +22,7 @@ The Edge provider uses a Plugin to
 
 - Extend the REST API endpoints for connecting workers to the Airflow cluster
 - Provide a web UI for managing the workers and monitoring their status and 
tasks
-  (Note: Airflow 3.0 does not have support for UI plugins. The UI plugin is 
only available in Airflow 2.x and in 3.1 and newer.)
+  (Note: Airflow 3.0 does not have support for UI plugins. The UI plugin is 
only available in Airflow 3.1 and newer.)
 
 REST API endpoints
 ------------------
@@ -35,14 +35,14 @@ The Edge provider adds the following REST API endpoints to 
the Airflow API:
 - ``/edge_worker/v1/health``: Check that the API endpoint is deployed and 
active
 
 To see full documentation of the API endpoints open the Airflow web UI and 
navigate to
-the sub-path ``/edge_worker/docs`` (Airflow 3.0) or ``/edge_worker/v1/ui`` 
(Airflow 2.x).
+the sub-path ``/edge_worker/docs``.
 
 Web UI Plugin
 -------------
 
 .. note::
 
-    Airflow 3.0 does not support UI plugins. The UI plugin is only available 
in Airflow 2.x and in 3.1 and newer.
+    Airflow 3.0 does not support UI plugins. The UI plugin is only available 
in Airflow 3.1 and newer.
     Alternatively you can use the CLI commands as described in 
:ref:`deployment:maintenance-mgmt-cli`.
 
 The Edge provider adds a web UI plugin to the Airflow web UI. The plugin is
diff --git a/providers/edge3/pyproject.toml b/providers/edge3/pyproject.toml
index 41fd6d063c6..077449e9a82 100644
--- a/providers/edge3/pyproject.toml
+++ b/providers/edge3/pyproject.toml
@@ -58,7 +58,7 @@ requires-python = ">=3.10"
 # Make sure to run ``prek update-providers-dependencies --all-files``
 # After you modify the dependencies, and rebuild your Breeze CI image with 
``breeze ci-image build``
 dependencies = [
-    "apache-airflow>=2.11.0,!=3.1.0",
+    "apache-airflow>=3.0.0,!=3.1.0",
     "apache-airflow-providers-common-compat>=1.10.0", # use next version
     "pydantic>=2.11.0",
     "retryhttp>=1.2.0,!=1.3.0",
diff --git a/providers/edge3/src/airflow/providers/edge3/__init__.py 
b/providers/edge3/src/airflow/providers/edge3/__init__.py
index 0ec10be5e51..a1d4c24f2d4 100644
--- a/providers/edge3/src/airflow/providers/edge3/__init__.py
+++ b/providers/edge3/src/airflow/providers/edge3/__init__.py
@@ -32,8 +32,8 @@ __all__ = ["__version__"]
 __version__ = "1.6.0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.11.0"
+    "3.0.0"
 ):
     raise RuntimeError(
-        f"The package `apache-airflow-providers-edge3:{__version__}` needs 
Apache Airflow 2.11.0+"
+        f"The package `apache-airflow-providers-edge3:{__version__}` needs 
Apache Airflow 3.0.0+"
     )
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py 
b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py
index ac08b1c50eb..e0b0f7e8e19 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py
@@ -20,6 +20,7 @@ import json
 import logging
 import os
 from datetime import datetime
+from functools import cache
 from http import HTTPStatus
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
@@ -29,12 +30,12 @@ import requests
 from retryhttp import retry, wait_retry_after
 from tenacity import before_sleep_log, wait_random_exponential
 
+from airflow.api_fastapi.auth.tokens import JWTGenerator
 from airflow.configuration import conf
 from airflow.providers.edge3.models.edge_worker import (
     EdgeWorkerDuplicateException,
     EdgeWorkerVersionException,
 )
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.providers.edge3.worker_api.datamodels import (
     EdgeJobFetched,
     PushLogsBody,
@@ -74,6 +75,15 @@ API_RETRY_WAIT_MAX = float(
 _default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, 
max=API_RETRY_WAIT_MAX)
 
 
+@cache
+def jwt_generator() -> JWTGenerator:
+    return JWTGenerator(
+        secret_key=conf.get("api_auth", "jwt_secret"),
+        valid_for=conf.getint("api_auth", "jwt_leeway", fallback=30),
+        audience="api",
+    )
+
+
 @retry(
     reraise=True,
     max_attempt_number=API_RETRIES,
@@ -84,28 +94,7 @@ _default_wait = 
wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WA
     before_sleep=before_sleep_log(logger, logging.WARNING),
 )
 def _make_generic_request(method: str, rest_path: str, data: str | None = 
None) -> Any:
-    if AIRFLOW_V_3_0_PLUS:
-        from functools import cache
-
-        from airflow.api_fastapi.auth.tokens import JWTGenerator
-
-        @cache
-        def jwt_generator() -> JWTGenerator:
-            return JWTGenerator(
-                secret_key=conf.get("api_auth", "jwt_secret"),
-                valid_for=conf.getint("api_auth", "jwt_leeway", fallback=30),
-                audience="api",
-            )
-
-        generator = jwt_generator()
-        authorization = generator.generate({"method": rest_path})
-    else:
-        # Airflow 2.10 compatibility
-        from airflow.providers.edge3.worker_api.auth import jwt_signer
-
-        signer = jwt_signer()
-        authorization = signer.generate_signed_token({"method": rest_path})
-
+    authorization = jwt_generator().generate({"method": rest_path})
     api_url = conf.get("edge", "api_url")
     headers = {
         "Content-Type": "application/json",
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py 
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 739dfd3bd69..f7707922640 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -25,7 +25,6 @@ from functools import cache
 from http import HTTPStatus
 from multiprocessing import Process
 from pathlib import Path
-from subprocess import Popen
 from time import sleep
 from typing import TYPE_CHECKING
 
@@ -39,7 +38,6 @@ from airflow.providers.edge3 import __version__ as 
edge_provider_version
 from airflow.providers.edge3.cli.api_client import (
     jobs_fetch,
     jobs_set_state,
-    logs_logfile_path,
     logs_push,
     worker_register,
     worker_set_state,
@@ -56,7 +54,6 @@ from airflow.providers.edge3.models.edge_worker import (
     EdgeWorkerState,
     EdgeWorkerVersionException,
 )
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.utils.net import getfqdn
 from airflow.utils.state import TaskInstanceState
 
@@ -218,7 +215,7 @@ class EdgeWorker:
             return 1
 
     @staticmethod
-    def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]:
+    def _launch_job(edge_job: EdgeJobFetched):
         if TYPE_CHECKING:
             from airflow.executors.workloads import ExecuteTask
 
@@ -232,29 +229,6 @@ class EdgeWorker:
         if TYPE_CHECKING:
             assert workload.log_path  # We need to assume this is defined in 
here
         logfile = Path(base_log_folder, workload.log_path)
-        return process, logfile
-
-    @staticmethod
-    def _launch_job_af2_10(edge_job: EdgeJobFetched) -> tuple[Popen, Path]:
-        """Compatibility for Airflow 2.10 Launch."""
-        env = os.environ.copy()
-        env["AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION"] = "True"
-        env["AIRFLOW__CORE__INTERNAL_API_URL"] = conf.get("edge", "api_url")
-        env["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
-        command: list[str] = edge_job.command  # type: ignore[assignment]
-        process = Popen(command, close_fds=True, env=env, 
start_new_session=True)
-        logfile = logs_logfile_path(edge_job.key)
-        return process, logfile
-
-    @staticmethod
-    def _launch_job(edge_job: EdgeJobFetched):
-        """Get the received job executed."""
-        process: Popen | Process
-        if AIRFLOW_V_3_0_PLUS:
-            process, logfile = EdgeWorker._launch_job_af3(edge_job)
-        else:
-            # Airflow 2.10
-            process, logfile = EdgeWorker._launch_job_af2_10(edge_job)
         EdgeWorker.jobs.append(Job(edge_job, process, logfile, 0))
 
     def start(self):
diff --git 
a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py 
b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index dc2821831cb..f918e4b8035 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -29,6 +29,7 @@ from sqlalchemy.orm import Session
 
 from airflow.cli.cli_config import GroupCommand
 from airflow.configuration import conf
+from airflow.executors import workloads
 from airflow.executors.base_executor import BaseExecutor
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.common.compat.sdk import Stats, timezone
@@ -36,7 +37,6 @@ from airflow.providers.edge3.cli.edge_command import 
EDGE_COMMANDS
 from airflow.providers.edge3.models.edge_job import EdgeJobModel
 from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
 from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, 
EdgeWorkerState, reset_metrics
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.utils.db import DBLocks, create_global_lock
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import TaskInstanceState
@@ -68,8 +68,10 @@ class EdgeExecutor(BaseExecutor):
         """
         Check if already existing table matches the newest table schema.
 
-        workaround till support for Airflow 2.x is dropped,
+        workaround as Airflow 2.x had no support for provider DB migrations,
         then it is possible to use alembic also for provider distributions.
+
+        TODO(jscheffl): Change to alembic DB migrations in the future.
         """
         inspector = inspect(engine)
         edge_job_columns = None
@@ -124,66 +126,13 @@ class EdgeExecutor(BaseExecutor):
         self.edge_queued_tasks = deepcopy(self.queued_tasks)
         super()._process_tasks(task_tuples)  # type: ignore[misc]
 
-    @provide_session
-    def execute_async(
-        self,
-        key: TaskInstanceKey,
-        command: CommandType,
-        queue: str | None = None,
-        executor_config: Any | None = None,
-        session: Session = NEW_SESSION,
-    ) -> None:
-        """Execute asynchronously. Airflow 2.10 entry point to execute a 
task."""
-        # Use of a temporary trick to get task instance, will be changed with 
Airflow 3.0.0
-        # code works together with _process_tasks overwrite to get task 
instance.
-        # TaskInstance in fourth element
-        task_instance = self.edge_queued_tasks[key][3]  # type: ignore[index]
-        del self.edge_queued_tasks[key]
-
-        self.validate_airflow_tasks_run_command(command)  # type: 
ignore[attr-defined]
-
-        # Check if job already exists with same dag_id, task_id, run_id, 
map_index, try_number
-        existing_job = (
-            session.query(EdgeJobModel)
-            .filter_by(
-                dag_id=key.dag_id,
-                task_id=key.task_id,
-                run_id=key.run_id,
-                map_index=key.map_index,
-                try_number=key.try_number,
-            )
-            .first()
-        )
-
-        if existing_job:
-            existing_job.state = TaskInstanceState.QUEUED
-            existing_job.queue = queue or DEFAULT_QUEUE
-            existing_job.concurrency_slots = task_instance.pool_slots
-            existing_job.command = str(command)
-        else:
-            session.add(
-                EdgeJobModel(
-                    dag_id=key.dag_id,
-                    task_id=key.task_id,
-                    run_id=key.run_id,
-                    map_index=key.map_index,
-                    try_number=key.try_number,
-                    state=TaskInstanceState.QUEUED,
-                    queue=queue or DEFAULT_QUEUE,
-                    concurrency_slots=task_instance.pool_slots,
-                    command=str(command),
-                )
-            )
-
     @provide_session
     def queue_workload(
         self,
-        workload: Any,  # Note actually "airflow.executors.workloads.All" but 
not existing in Airflow 2.10
+        workload: workloads.All,
         session: Session = NEW_SESSION,
     ) -> None:
         """Put new workload to queue. Airflow 3 entry point to execute a 
task."""
-        from airflow.executors import workloads
-
         if not isinstance(workload, workloads.ExecuteTask):
             raise TypeError(f"Don't know how to queue workload of type 
{type(workload).__name__}")
 
@@ -262,11 +211,7 @@ class EdgeExecutor(BaseExecutor):
 
     def _update_orphaned_jobs(self, session: Session) -> bool:
         """Update status ob jobs when workers die and don't update anymore."""
-        if AIRFLOW_V_3_0_PLUS:
-            heartbeat_interval_config_name = "task_instance_heartbeat_timeout"
-        else:
-            heartbeat_interval_config_name = "scheduler_zombie_task_threshold"
-        heartbeat_interval: int = conf.getint("scheduler", 
heartbeat_interval_config_name)
+        heartbeat_interval: int = conf.getint("scheduler", 
"task_instance_heartbeat_timeout")
         lifeless_jobs: list[EdgeJobModel] = (
             session.query(EdgeJobModel)
             .with_for_update(skip_locked=True)
diff --git a/providers/edge3/src/airflow/providers/edge3/openapi/__init__.py 
b/providers/edge3/src/airflow/providers/edge3/openapi/__init__.py
deleted file mode 100644
index 49b78ecf1d8..00000000000
--- a/providers/edge3/src/airflow/providers/edge3/openapi/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""OpenAPI Specs for Connexion API in Airflow 2.10.x."""
-
-# Note: This module folder is to be removed once Airflow 2.10.x support is 
removed.
diff --git 
a/providers/edge3/src/airflow/providers/edge3/openapi/edge_worker_api_v1.yaml 
b/providers/edge3/src/airflow/providers/edge3/openapi/edge_worker_api_v1.yaml
deleted file mode 100644
index 1bbadd20cd0..00000000000
--- 
a/providers/edge3/src/airflow/providers/edge3/openapi/edge_worker_api_v1.yaml
+++ /dev/null
@@ -1,808 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
----
-openapi: 3.0.2
-info:
-  title: Airflow Edge Worker API
-  version: 1.0.0
-  description: |
-    This is Airflow Edge Worker API - which is a the access endpoint for 
workers
-    running on remote sites serving for Apache Airflow jobs. It also proxies 
internal API
-    to edge endpoints.
-
-    It is not intended to be used by any external code.
-
-    You can find more information in AIP-69
-    https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=301795932
-
-
-servers:
-  - url: /edge_worker/v1
-    description: Airflow Edge Worker API
-paths:
-  /worker/{worker_name}:
-    patch:
-      description: Set state of worker and returns the current assigned queues.
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes._v2_routes
-      operationId: set_state_v2
-      parameters:
-      - description: Hostname or instance name of the worker
-        in: path
-        name: worker_name
-        required: true
-        schema:
-          description: Hostname or instance name of the worker
-          title: Worker Name
-          type: string
-      - description: JWT Authorization Token
-        in: header
-        name: authorization
-        required: true
-        schema:
-          description: JWT Authorization Token
-          title: Authorization
-          type: string
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/WorkerStateBody'
-              description: State of the worker with details
-              examples:
-              - jobs_active: 3
-                queues:
-                - large_node
-                - wisconsin_site
-                state: running
-                sysinfo:
-                  airflow_version: 2.10.0
-                  concurrency: 4
-                  edge_provider_version: 1.0.0
-              title: Worker State
-        required: true
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/WorkerSetStateReturn'
-          description: Successful Response
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Bad Request
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '422':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
-          description: Validation Error
-      summary: Set State
-      tags:
-      - Worker
-    post:
-      description: Register a new worker to the backend.
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes._v2_routes
-      operationId: register_v2
-      parameters:
-      - description: Hostname or instance name of the worker
-        in: path
-        name: worker_name
-        required: true
-        schema:
-          description: Hostname or instance name of the worker
-          title: Worker Name
-          type: string
-      - description: JWT Authorization Token
-        in: header
-        name: authorization
-        required: true
-        schema:
-          description: JWT Authorization Token
-          title: Authorization
-          type: string
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/WorkerStateBody'
-              description: State of the worker with details
-              examples:
-              - jobs_active: 3
-                queues:
-                - large_node
-                - wisconsin_site
-                state: running
-                sysinfo:
-                  airflow_version: 2.10.0
-                  concurrency: 4
-                  edge_provider_version: 1.0.0
-              title: Worker State
-        required: true
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                $ref: "#/components/schemas/WorkerRegistrationReturn"
-                description: Registration response with the last update time 
of the worker.
-                examples:
-                - last_update: "2025-04-04T13:59:58.773870"
-                title: Worker Registration
-          description: Successful Response
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Bad Request
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '422':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
-          description: Validation Error
-      summary: Register
-      tags:
-      - Worker
-  /jobs/fetch/{worker_name}:
-    post:
-      description: Fetch a job to execute on the edge worker.
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes._v2_routes
-      operationId: job_fetch_v2
-      parameters:
-      - in: path
-        name: worker_name
-        required: true
-        schema:
-          title: Worker Name
-          type: string
-      - description: JWT Authorization Token
-        in: header
-        name: authorization
-        required: true
-        schema:
-          description: JWT Authorization Token
-          title: Authorization
-          type: string
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/WorkerQueuesBody'
-              description: The worker remote has no access to log sink and 
with this
-                can send log chunks to the central site.
-              title: Log data chunks
-        required: true
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                anyOf:
-                - $ref: '#/components/schemas/EdgeJobFetched'
-                - type: object
-                  nullable: true
-                title: Response Fetch
-          description: Successful Response
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Bad Request
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '422':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
-          description: Validation Error
-      summary: Fetch
-      tags:
-      - Jobs
-  /jobs/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}:
-    patch:
-      description: Update the state of a job running on the edge worker.
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes._v2_routes
-      operationId: job_state_v2
-      parameters:
-      - description: Identifier of the DAG to which the task belongs.
-        in: path
-        name: dag_id
-        required: true
-        schema:
-          description: Identifier of the DAG to which the task belongs.
-          title: Dag ID
-          type: string
-      - description: Task name in the DAG.
-        in: path
-        name: task_id
-        required: true
-        schema:
-          description: Task name in the DAG.
-          title: Task ID
-          type: string
-      - description: Run ID of the DAG execution.
-        in: path
-        name: run_id
-        required: true
-        schema:
-          description: Run ID of the DAG execution.
-          title: Run ID
-          type: string
-      - description: The number of attempt to execute this task.
-        in: path
-        name: try_number
-        required: true
-        schema:
-          description: The number of attempt to execute this task.
-          title: Try Number
-          type: integer
-      - description: For dynamically mapped tasks the mapping number, -1 if 
the task
-          is not mapped.
-        in: path
-        name: map_index
-        required: true
-        schema:
-          description: For dynamically mapped tasks the mapping number, -1 if 
the
-            task is not mapped.
-          title: Map Index
-          type: string  # This should be integer, but Connexion/Flask do not 
support negative integers in path parameters
-      - description: State of the assigned task under execution.
-        in: path
-        name: state
-        required: true
-        schema:
-          $ref: '#/components/schemas/TaskInstanceState'
-          description: State of the assigned task under execution.
-          title: Task State
-      - description: JWT Authorization Token
-        in: header
-        name: authorization
-        required: true
-        schema:
-          description: JWT Authorization Token
-          title: Authorization
-          type: string
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                title: Response State
-                type: object
-                nullable: true
-          description: Successful Response
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Bad Request
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '422':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
-          description: Validation Error
-      summary: State
-      tags:
-      - Jobs
-  /logs/logfile_path/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}:
-    get:
-      description: Elaborate the path and filename to expect from task 
execution.
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes._v2_routes
-      operationId: logfile_path_v2
-      parameters:
-      - description: Identifier of the DAG to which the task belongs.
-        in: path
-        name: dag_id
-        required: true
-        schema:
-          description: Identifier of the DAG to which the task belongs.
-          title: Dag ID
-          type: string
-      - description: Task name in the DAG.
-        in: path
-        name: task_id
-        required: true
-        schema:
-          description: Task name in the DAG.
-          title: Task ID
-          type: string
-      - description: Run ID of the DAG execution.
-        in: path
-        name: run_id
-        required: true
-        schema:
-          description: Run ID of the DAG execution.
-          title: Run ID
-          type: string
-      - description: The number of attempt to execute this task.
-        in: path
-        name: try_number
-        required: true
-        schema:
-          description: The number of attempt to execute this task.
-          title: Try Number
-          type: integer
-      - description: For dynamically mapped tasks the mapping number, -1 if 
the task
-          is not mapped.
-        in: path
-        name: map_index
-        required: true
-        schema:
-          description: For dynamically mapped tasks the mapping number, -1 if 
the
-            task is not mapped.
-          title: Map Index
-          type: string  # This should be integer, but Connexion/Flask do not 
support negative integers in path parameters
-      - description: JWT Authorization Token
-        in: header
-        name: authorization
-        required: true
-        schema:
-          description: JWT Authorization Token
-          title: Authorization
-          type: string
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                title: Response Logfile Path
-                type: string
-          description: Successful Response
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Bad Request
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '422':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
-          description: Validation Error
-      summary: Logfile Path
-      tags:
-      - Logs
-  /logs/push/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}:
-    post:
-      description: Push an incremental log chunk from Edge Worker to central 
site.
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes._v2_routes
-      operationId: push_logs_v2
-      parameters:
-      - description: Identifier of the DAG to which the task belongs.
-        in: path
-        name: dag_id
-        required: true
-        schema:
-          description: Identifier of the DAG to which the task belongs.
-          title: Dag ID
-          type: string
-      - description: Task name in the DAG.
-        in: path
-        name: task_id
-        required: true
-        schema:
-          description: Task name in the DAG.
-          title: Task ID
-          type: string
-      - description: Run ID of the DAG execution.
-        in: path
-        name: run_id
-        required: true
-        schema:
-          description: Run ID of the DAG execution.
-          title: Run ID
-          type: string
-      - description: The number of attempt to execute this task.
-        in: path
-        name: try_number
-        required: true
-        schema:
-          description: The number of attempt to execute this task.
-          title: Try Number
-          type: integer
-      - description: For dynamically mapped tasks the mapping number, -1 if 
the task
-          is not mapped.
-        in: path
-        name: map_index
-        required: true
-        schema:
-          description: For dynamically mapped tasks the mapping number, -1 if 
the
-            task is not mapped.
-          title: Map Index
-          type: string  # This should be integer, but Connexion/Flask do not 
support negative integers in path parameters
-      - description: JWT Authorization Token
-        in: header
-        name: authorization
-        required: true
-        schema:
-          description: JWT Authorization Token
-          title: Authorization
-          type: string
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/PushLogsBody'
-              description: The worker remote has no access to log sink and 
with this
-                can send log chunks to the central site.
-              title: Log data chunks
-        required: true
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                title: Response Push Logs
-                type: object
-                nullable: true
-          description: Successful Response
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Bad Request
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '422':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
-          description: Validation Error
-      summary: Push Logs
-      tags:
-      - Logs
-  /rpcapi:
-    post:
-      deprecated: false
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes._v2_routes
-      operationId: rpcapi_v2
-      tags:
-      - JSONRPC
-      parameters: []
-      responses:
-        '200':
-          description: Successful response
-      requestBody:
-        x-body-name: body
-        required: true
-        content:
-          application/json:
-            schema:
-              type: object
-              required:
-              - method
-              - jsonrpc
-              - params
-              properties:
-                jsonrpc:
-                  type: string
-                  default: '2.0'
-                  description: JSON-RPC Version (2.0)
-                method:
-                  type: string
-                  description: Method name
-                params:
-                  title: Parameters
-                  type: object
-  /health:
-    get:
-      operationId: health
-      deprecated: false
-      x-openapi-router-controller: 
airflow.providers.edge3.worker_api.routes.health
-      tags:
-      - JSONRPC
-      parameters: []
-      responses:
-        '200':
-          description: Successful response
-x-headers: []
-x-explorer-enabled: true
-x-proxy-enabled: true
-components:
-  schemas:
-    JsonRpcRequired:
-      type: object
-      required:
-      - method
-      - jsonrpc
-      properties:
-        method:
-          type: string
-          description: Method name
-        jsonrpc:
-          type: string
-          default: '2.0'
-          description: JSON-RPC Version (2.0)
-      discriminator:
-        propertyName: method_name
-    EdgeWorkerState:
-      description: Status of a Edge Worker instance.
-      enum:
-      - starting
-      - running
-      - idle
-      - terminating
-      - offline
-      - unknown
-      - maintenance mode
-      - maintenance request
-      - maintenance pending
-      - maintenance exit
-      - offline maintenance
-      title: EdgeWorkerState
-      type: string
-    WorkerStateBody:
-      description: Details of the worker state sent to the scheduler.
-      type: object
-      required:
-      - state
-      - queues
-      - sysinfo
-      properties:
-        jobs_active:
-          default: 0
-          description: Number of active jobs the worker is running.
-          title: Jobs Active
-          type: integer
-        queues:
-          anyOf:
-          - items:
-              type: string
-            type: array
-          - type: object
-            nullable: true
-          description: List of queues the worker is pulling jobs from. If not 
provided,
-            worker pulls from all queues.
-          title: Queues
-        state:
-          $ref: '#/components/schemas/EdgeWorkerState'
-          description: State of the worker from the view of the worker.
-        sysinfo:
-          description: System information of the worker.
-          title: Sysinfo
-          type: object
-        maintenance_comments:
-          description: Comments about the maintenance state of the worker.
-          title: Maintenance Comments
-          anyOf:
-          - type: string
-          - type: object
-            nullable: true
-      title: WorkerStateBody
-    WorkerQueuesBody:
-      description: Queues that a worker supports to run jobs on.
-      properties:
-        queues:
-          anyOf:
-          - items:
-              type: string
-            type: array
-          - type: object
-            nullable: true
-          description: List of queues the worker is pulling jobs from. If not 
provided,
-            worker pulls from all queues.
-          title: Queues
-        free_concurrency:
-          description: Number of free slots for running tasks.
-          title: Free Concurrency
-          type: integer
-      required:
-      - queues
-      - free_concurrency
-      title: WorkerQueuesBody
-      type: object
-    WorkerRegistrationReturn:
-      description: The response of the worker registration.
-      properties:
-        last_update:
-          description: Time of the last update of the worker.
-          format: date-time
-          title: Last Update
-          type: string
-      title: WorkerRegistrationReturn
-    WorkerSetStateReturn:
-      description: The state written in the database
-      properties:
-        queues:
-          anyOf:
-          - items:
-              type: string
-            type: array
-          - type: object
-            nullable: true
-          description: List of queues the worker is pulling jobs from. If not 
provided,
-            worker pulls from all queues.
-          title: Queues
-        state:
-          $ref: '#/components/schemas/EdgeWorkerState'
-          description: State of the worker from the view of the worker.
-        maintenance_comments:
-          description: Comments about the maintenance state of the worker.
-          title: Maintenance Comments
-          anyOf:
-          - type: string
-          - type: object
-            nullable: true
-      title: WorkerSetStateReturn
-    EdgeJobFetched:
-      description: Job that is to be executed on the edge worker.
-      properties:
-        command:
-          description: Command line to use to execute the job.
-          items:
-            type: string
-          title: Command
-          type: array
-        concurrency_slots:
-          description: Number of slots to use for the task.
-          title: Concurrency Slots
-          type: integer
-        dag_id:
-          description: Identifier of the DAG to which the task belongs.
-          title: Dag ID
-          type: string
-        map_index:
-          description: For dynamically mapped tasks the mapping number, -1 if 
the
-            task is not mapped.
-          title: Map Index
-          type: integer
-        run_id:
-          description: Run ID of the DAG execution.
-          title: Run ID
-          type: string
-        task_id:
-          description: Task name in the DAG.
-          title: Task ID
-          type: string
-        try_number:
-          description: The number of attempt to execute this task.
-          title: Try Number
-          type: integer
-      required:
-      - dag_id
-      - task_id
-      - run_id
-      - map_index
-      - try_number
-      - command
-      title: EdgeJobFetched
-      type: object
-    TaskInstanceState:
-      description: 'All possible states that a Task Instance can be in.
-
-
-        Note that None is also allowed, so always use this in a type hint with 
Optional.'
-      enum:
-      - removed
-      - scheduled
-      - queued
-      - running
-      - success
-      - restarting
-      - failed
-      - up_for_retry
-      - up_for_reschedule
-      - upstream_failed
-      - skipped
-      - deferred
-      title: TaskInstanceState
-      type: string
-    PushLogsBody:
-      description: Incremental new log content from worker.
-      properties:
-        log_chunk_data:
-          description: Log chunk data as incremental log text.
-          title: Log Chunk Data
-          type: string
-        log_chunk_time:
-          description: Time of the log chunk at point of sending.
-          format: date-time
-          title: Log Chunk Time
-          type: string
-      required:
-      - log_chunk_time
-      - log_chunk_data
-      title: PushLogsBody
-      type: object
-    HTTPExceptionResponse:
-      description: HTTPException Model used for error response.
-      properties:
-        detail:
-          anyOf:
-          - type: string
-          - type: object
-          title: Detail
-      required:
-      - detail
-      title: HTTPExceptionResponse
-      type: object
-    HTTPValidationError:
-      properties:
-        detail:
-          items:
-            $ref: '#/components/schemas/ValidationError'
-          title: Detail
-          type: array
-      title: HTTPValidationError
-      type: object
-    ValidationError:
-      properties:
-        loc:
-          items:
-            anyOf:
-            - type: string
-            - type: integer
-          title: Location
-          type: array
-        msg:
-          title: Message
-          type: string
-        type:
-          title: Error Type
-          type: string
-      required:
-      - loc
-      - msg
-      - type
-      title: ValidationError
-      type: object
-
-tags: []
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py 
b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
index 110e9efb7fc..b8aa49a642f 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
@@ -23,188 +23,35 @@ from typing import TYPE_CHECKING, Any
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.plugins_manager import AirflowPlugin
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS
+from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS
 from airflow.utils.session import NEW_SESSION, provide_session
 
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.utils.db import DBLocks, create_global_lock
+from airflow.utils.db import DBLocks, create_global_lock
 
-    @provide_session
-    def _get_api_endpoint(session: Session = NEW_SESSION) -> dict[str, Any]:
-        # Ensure all required DB modeals are created before starting the API
-        with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
-            engine = session.get_bind().engine
-            from airflow.providers.edge3.models.edge_job import EdgeJobModel
-            from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
-            from airflow.providers.edge3.models.edge_worker import 
EdgeWorkerModel
 
-            EdgeJobModel.metadata.create_all(engine)
-            EdgeLogsModel.metadata.create_all(engine)
-            EdgeWorkerModel.metadata.create_all(engine)
+@provide_session
+def _get_api_endpoint(session: Session = NEW_SESSION) -> dict[str, Any]:
+    # Ensure all required DB modeals are created before starting the API
+    with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
+        engine = session.get_bind().engine
+        from airflow.providers.edge3.models.edge_job import EdgeJobModel
+        from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
+        from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
 
-        from airflow.providers.edge3.worker_api.app import 
create_edge_worker_api_app
+        EdgeJobModel.metadata.create_all(engine)
+        EdgeLogsModel.metadata.create_all(engine)
+        EdgeWorkerModel.metadata.create_all(engine)
 
-        return {
-            "app": create_edge_worker_api_app(),
-            "url_prefix": "/edge_worker",
-            "name": "Airflow Edge Worker",
-        }
+    from airflow.providers.edge3.worker_api.app import 
create_edge_worker_api_app
 
-else:
-    # This is for back-compatibility with Airflow 2.x and we only make this
-    # to prevents dependencies and breaking imports in Airflow 3.x
-    import re
-    from datetime import datetime, timedelta
-    from pathlib import Path
-
-    from flask import Blueprint, redirect, request, url_for
-    from flask_appbuilder import BaseView, expose
-    from markupsafe import Markup
-    from sqlalchemy import select
-
-    from airflow.auth.managers.models.resource_details import AccessView
-    from airflow.utils.state import State, TaskInstanceState
-    from airflow.utils.yaml import safe_load
-    from airflow.www.auth import has_access_view
-
-    def _get_airflow_2_api_endpoint() -> Blueprint:
-        from airflow.www.app import csrf
-        from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
-        from airflow.www.extensions.init_views import 
_CustomErrorRequestBodyValidator, _LazyResolver
-
-        folder = Path(__file__).parents[1].resolve()  # this is 
airflow/providers/edge3/
-        with folder.joinpath("openapi", "edge_worker_api_v1.yaml").open() as f:
-            specification = safe_load(f)
-        from connexion import FlaskApi
-
-        bp = FlaskApi(
-            specification=specification,
-            resolver=_LazyResolver(),
-            base_path="/edge_worker/v1",
-            strict_validation=True,
-            options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": 
SWAGGER_BUNDLE.__fspath__()},
-            validate_responses=True,
-            validator_map={"body": _CustomErrorRequestBodyValidator},
-        ).blueprint
-        # Need to exempt CSRF to make API usable
-        csrf.exempt(bp)
-        return bp
-
-    def _state_token(state):
-        """Return a formatted string with HTML for a given State."""
-        color = State.color(state)
-        fg_color = State.color_fg(state)
-        return Markup(
-            """
-            <span class="label" style="color:{fg_color}; 
background-color:{color};"
-                title="Current State: {state}">{state}</span>
-            """
-        ).format(color=color, state=state, fg_color=fg_color)
-
-    def modify_maintenance_comment_on_update(maintenance_comment: str | None, 
username: str) -> str:
-        if maintenance_comment:
-            if re.search(
-                r"^\[[-\d:\s]+\] - .+ put node into maintenance 
mode\r?\nComment:.*", maintenance_comment
-            ):
-                return re.sub(
-                    r"^\[[-\d:\s]+\] - .+ put node into maintenance 
mode\r?\nComment:",
-                    f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - 
{username} updated maintenance mode\nComment:",
-                    maintenance_comment,
-                )
-            if re.search(r"^\[[-\d:\s]+\] - .+ updated maintenance 
mode\r?\nComment:.*", maintenance_comment):
-                return re.sub(
-                    r"^\[[-\d:\s]+\] - .+ updated maintenance 
mode\r?\nComment:",
-                    f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - 
{username} updated maintenance mode\nComment:",
-                    maintenance_comment,
-                )
-            return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - 
{username} updated maintenance mode\nComment: {maintenance_comment}"
-        return (
-            f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} 
updated maintenance mode\nComment:"
-        )
-
-    # registers airflow/providers/edge3/plugins/templates as a Jinja template 
folder
-    template_bp = Blueprint(
-        "template_blueprint",
-        __name__,
-        template_folder="templates",
-    )
-
-    class EdgeWorkerJobs(BaseView):
-        """Simple view to show Edge Worker jobs."""
-
-        default_view = "jobs"
-
-        @expose("/jobs")
-        @has_access_view(AccessView.JOBS)
-        @provide_session
-        def jobs(self, session: Session = NEW_SESSION):
-            from airflow.providers.edge3.models.edge_job import EdgeJobModel
-
-            jobs = 
session.scalars(select(EdgeJobModel).order_by(EdgeJobModel.queued_dttm)).all()
-            html_states = {
-                str(state): _state_token(str(state)) for state in 
TaskInstanceState.__members__.values()
-            }
-            return self.render_template("edge_worker_jobs.html", jobs=jobs, 
html_states=html_states)
-
-    class EdgeWorkerHosts(BaseView):
-        """Simple view to show Edge Worker status."""
-
-        default_view = "status"
-
-        @expose("/status")
-        @has_access_view(AccessView.JOBS)
-        @provide_session
-        def status(self, session: Session = NEW_SESSION):
-            from airflow.providers.edge3.models.edge_worker import 
EdgeWorkerModel
-
-            hosts = 
session.scalars(select(EdgeWorkerModel).order_by(EdgeWorkerModel.worker_name)).all()
-            five_min_ago = datetime.now() - timedelta(minutes=5)
-            return self.render_template("edge_worker_hosts.html", hosts=hosts, 
five_min_ago=five_min_ago)
-
-        @expose("/status/maintenance/<string:worker_name>/on", 
methods=["POST"])
-        @has_access_view(AccessView.JOBS)
-        def worker_to_maintenance(self, worker_name: str):
-            from flask_login import current_user
-
-            from airflow.providers.edge3.models.edge_worker import 
request_maintenance
-
-            maintenance_comment = request.form.get("maintenance_comment")
-            maintenance_comment = f"[{datetime.now().strftime('%Y-%m-%d 
%H:%M')}] - {current_user.username} put node into maintenance mode\nComment: 
{maintenance_comment}"
-            request_maintenance(worker_name, maintenance_comment)
-            return redirect(url_for("EdgeWorkerHosts.status"))
-
-        @expose("/status/maintenance/<string:worker_name>/off", 
methods=["POST"])
-        @has_access_view(AccessView.JOBS)
-        def remove_worker_from_maintenance(self, worker_name: str):
-            from airflow.providers.edge3.models.edge_worker import 
exit_maintenance
-
-            exit_maintenance(worker_name)
-            return redirect(url_for("EdgeWorkerHosts.status"))
-
-        @expose("/status/maintenance/<string:worker_name>/remove", 
methods=["POST"])
-        @has_access_view(AccessView.JOBS)
-        def remove_worker(self, worker_name: str):
-            from airflow.providers.edge3.models.edge_worker import 
remove_worker
-
-            remove_worker(worker_name)
-            return redirect(url_for("EdgeWorkerHosts.status"))
-
-        @expose("/status/maintenance/<string:worker_name>/change_comment", 
methods=["POST"])
-        @has_access_view(AccessView.JOBS)
-        def change_maintenance_comment(self, worker_name: str):
-            from flask_login import current_user
-
-            from airflow.providers.edge3.models.edge_worker import 
change_maintenance_comment
-
-            maintenance_comment = request.form.get("maintenance_comment")
-            maintenance_comment = modify_maintenance_comment_on_update(
-                maintenance_comment, current_user.username
-            )
-            change_maintenance_comment(worker_name, maintenance_comment)
-            return redirect(url_for("EdgeWorkerHosts.status"))
+    return {
+        "app": create_edge_worker_api_app(),
+        "url_prefix": "/edge_worker",
+        "name": "Airflow Edge Worker",
+    }
 
 
 # Check if EdgeExecutor is actually loaded
@@ -213,17 +60,14 @@ try:
 except AirflowConfigException:
     EDGE_EXECUTOR_ACTIVE = False
 
-# Load the API endpoint only on api-server (Airflow 3.x) or webserver (Airflow 
2.x)
-# todo(jscheffl): Remove this check when the discussion in
+# Load the API endpoint only on api-server
+# TODO(jscheffl): Remove this check when the discussion in
 #                 
https://lists.apache.org/thread/w170czq6r7bslkqp1tk6bjjjo0789wgl
 #                 resulted in a proper API to selective initialize. Maybe 
backcompat-shim
 #                 is also needed to support Airflow-versions prior the rework.
-if AIRFLOW_V_3_0_PLUS:
-    RUNNING_ON_APISERVER = (len(sys.argv) > 1 and sys.argv[1] in 
["api-server"]) or (
-        len(sys.argv) > 2 and sys.argv[2] == 
"airflow-core/src/airflow/api_fastapi/main.py"
-    )
-else:
-    RUNNING_ON_APISERVER = "gunicorn" in sys.argv[0] and "airflow-webserver" 
in sys.argv
+RUNNING_ON_APISERVER = (len(sys.argv) > 1 and sys.argv[1] in ["api-server"]) 
or (
+    len(sys.argv) > 2 and sys.argv[2] == 
"airflow-core/src/airflow/api_fastapi/main.py"
+)
 
 
 def _get_base_url_path(path: str) -> str:
@@ -247,8 +91,9 @@ class EdgeExecutorPlugin(AirflowPlugin):
 
     name = "edge_executor"
     if EDGE_EXECUTOR_ACTIVE and RUNNING_ON_APISERVER:
+        fastapi_apps = [_get_api_endpoint()]
         if AIRFLOW_V_3_1_PLUS:
-            fastapi_apps = [_get_api_endpoint()]
+            # Airflow 3.0 does not know about react_apps, so we only provide 
the API endpoint
             react_apps = [
                 {
                     "name": "Edge Executor",
@@ -271,27 +116,3 @@ class EdgeExecutorPlugin(AirflowPlugin):
                     "url_route": "edge_worker_api_docs",
                 }
             ]
-        if AIRFLOW_V_3_0_PLUS:
-            # Airflow 3.0 does not know about react_apps, so we only provide 
the API endpoint
-            fastapi_apps = [_get_api_endpoint()]
-        else:
-            appbuilder_menu_items = [
-                {
-                    "name": "Edge Worker API docs",
-                    "href": _get_base_url_path("/edge_worker/v1/ui"),
-                    "category": "Docs",
-                }
-            ]
-            appbuilder_views = [
-                {
-                    "name": "Edge Worker Jobs",
-                    "category": "Admin",
-                    "view": EdgeWorkerJobs(),
-                },
-                {
-                    "name": "Edge Worker Hosts",
-                    "category": "Admin",
-                    "view": EdgeWorkerHosts(),
-                },
-            ]
-            flask_blueprints = [_get_airflow_2_api_endpoint(), template_bp]
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json
index 14b306a2e91..911d5353f5d 100644
--- a/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json
+++ b/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json
@@ -27,7 +27,7 @@
     "lint:fix": "eslint --fix && tsc --p tsconfig.app.json",
     "format": "pnpm prettier --write .",
     "preview": "vite preview",
-    "codegen": "openapi-rq -i ../../openapi/v2-edge-generated.yaml -c axios 
--format prettier -o openapi-gen --operationId",
+    "codegen": "openapi-rq -i ../../worker_api/v2-edge-generated.yaml -c axios 
--format prettier -o openapi-gen --operationId",
     "test": "vitest run",
     "coverage": "vitest run --coverage"
   },
diff --git a/providers/edge3/src/airflow/providers/edge3/version_compat.py 
b/providers/edge3/src/airflow/providers/edge3/version_compat.py
index 209e8b63f35..27070ab292b 100644
--- a/providers/edge3/src/airflow/providers/edge3/version_compat.py
+++ b/providers/edge3/src/airflow/providers/edge3/version_compat.py
@@ -32,10 +32,8 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
 AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
 
 __all__ = [
-    "AIRFLOW_V_3_0_PLUS",
     "AIRFLOW_V_3_1_PLUS",
 ]
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py
index 2188d8c7614..a29fd42b2d1 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py
@@ -20,6 +20,7 @@ import logging
 from functools import cache
 from uuid import uuid4
 
+from fastapi import Header, HTTPException, Request, status
 from itsdangerous import BadSignature
 from jwt import (
     ExpiredSignatureError,
@@ -29,49 +30,24 @@ from jwt import (
     InvalidSignatureError,
 )
 
+from airflow.api_fastapi.auth.tokens import JWTValidator
 from airflow.configuration import conf
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.providers.edge3.worker_api.datamodels import JsonRpcRequestBase  
# noqa: TCH001
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
-    Header,
-    HTTPException,
-    Request,
-    status,
-)
 
 log = logging.getLogger(__name__)
 
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.api_fastapi.auth.tokens import JWTValidator
-
-    @cache
-    def jwt_validator() -> JWTValidator:
-        return JWTValidator(
-            secret_key=conf.get("api_auth", "jwt_secret"),
-            leeway=conf.getint("api_auth", "jwt_leeway", fallback=30),
-            audience="api",
-        )
+@cache
+def jwt_validator() -> JWTValidator:
+    return JWTValidator(
+        secret_key=conf.get("api_auth", "jwt_secret"),
+        leeway=conf.getint("api_auth", "jwt_leeway", fallback=30),
+        audience="api",
+    )
 
-    def jwt_validate(authorization: str) -> dict:
-        return jwt_validator().validated_claims(authorization)
-
-else:
-    # Airflow 2.10 compatibility
-    from airflow.utils.jwt_signer import JWTSigner
-
-    @cache
-    def jwt_signer() -> JWTSigner:
-        clock_grace = conf.getint("core", "internal_api_clock_grace", 
fallback=30)
-        return JWTSigner(
-            secret_key=conf.get("core", "internal_api_secret_key"),
-            expiration_time_in_seconds=clock_grace,
-            leeway_in_seconds=clock_grace,
-            audience="api",
-        )
 
-    def jwt_validate(authorization: str) -> dict:
-        return jwt_signer().verify_token(authorization)
+def jwt_validate(authorization: str) -> dict:
+    return jwt_validator().validated_claims(authorization)
 
 
 def _forbidden_response(message: str):
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py
index 0d4530d8607..bd7b60c8b15 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py
@@ -22,11 +22,12 @@ from typing import (
     Any,
 )
 
+from fastapi import Path
 from pydantic import BaseModel, Field
 
+from airflow.executors.workloads import ExecuteTask  # noqa: TCH001
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.providers.edge3.models.edge_worker import EdgeWorkerState  # 
noqa: TCH001
-from airflow.providers.edge3.worker_api.routes._v2_compat import ExecuteTask, 
Path
 
 
 class WorkerApiDocs:
@@ -93,7 +94,7 @@ class EdgeJobFetched(EdgeJobBase):
         ExecuteTask,
         Field(
             title="Command",
-            description="Command line to use to execute the job in Airflow 2. 
Task definition in Airflow 3",
+            description="Command line to use to execute the job in Airflow",
         ),
     ]
     concurrency_slots: Annotated[int, Field(description="Number of concurrency 
slots the job requires.")]
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_compat.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_compat.py
deleted file mode 100644
index ffe1cb09249..00000000000
--- 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_compat.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Compatibility layer for API to provide both FastAPI as well as Connexion 
based endpoints."""
-
-from __future__ import annotations
-
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    # Just re-import the types from FastAPI and Airflow Core
-    from fastapi import Body, Depends, Header, HTTPException, Path, Request, 
status
-
-    from airflow.api_fastapi.common.db.common import SessionDep
-    from airflow.api_fastapi.common.router import AirflowRouter
-    from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
-
-    # In Airflow 3 with AIP-72 we get workload addressed by ExecuteTask
-    from airflow.executors.workloads import ExecuteTask
-
-    def parse_command(command: str) -> ExecuteTask:
-        return ExecuteTask.model_validate_json(command)
-else:
-    # Mock the external dependencies
-    from collections.abc import Callable
-
-    from connexion import ProblemException
-
-    class Body:  # type: ignore[no-redef]
-        def __init__(self, *_, **__):
-            pass
-
-    class Depends:  # type: ignore[no-redef]
-        def __init__(self, *_, **__):
-            pass
-
-    class Header:  # type: ignore[no-redef]
-        def __init__(self, *_, **__):
-            pass
-
-    class Path:  # type: ignore[no-redef]
-        def __init__(self, *_, **__):
-            pass
-
-    class Request:  # type: ignore[no-redef]
-        pass
-
-    class SessionDep:  # type: ignore[no-redef]
-        pass
-
-    def create_openapi_http_exception_doc(responses_status_code: list[int]) -> 
dict:
-        return {}
-
-    class status:  # type: ignore[no-redef]
-        HTTP_204_NO_CONTENT = 204
-        HTTP_400_BAD_REQUEST = 400
-        HTTP_403_FORBIDDEN = 403
-        HTTP_404_NOT_FOUND = 404
-        HTTP_409_CONFLICT = 409
-        HTTP_500_INTERNAL_SERVER_ERROR = 500
-
-    class HTTPException(ProblemException):  # type: ignore[no-redef]
-        """Raise when the user does not have the required permissions."""
-
-        def __init__(
-            self,
-            status: int,
-            detail: str,
-        ) -> None:
-            from airflow.utils.docs import get_docs_url
-
-            doc_link = get_docs_url("stable-rest-api-ref.html")
-            EXCEPTIONS_LINK_MAP = {
-                400: f"{doc_link}#section/Errors/BadRequest",
-                403: f"{doc_link}#section/Errors/PermissionDenied",
-                409: f"{doc_link}#section/Errors/Conflict",
-                500: f"{doc_link}#section/Errors/Unknown",
-            }
-            TITLE_MAP = {
-                400: "BadRequest",
-                403: "PermissionDenied",
-                409: "Conflict",
-                500: "InternalServerError",
-            }
-            super().__init__(
-                status=status,
-                type=EXCEPTIONS_LINK_MAP[status],
-                title=TITLE_MAP[status],
-                detail=detail,
-            )
-
-        @property
-        def status_code(self) -> int:
-            """Alias for status to match FastAPI's HTTPException interface."""
-            return self.status
-
-        def to_response(self):
-            from flask import Response
-
-            return Response(response=self.detail, status=self.status)
-
-    class AirflowRouter:  # type: ignore[no-redef]
-        def __init__(self, *_, **__):
-            pass
-
-        def get(self, *_, **__):
-            def decorator(func: Callable) -> Callable:
-                return func
-
-            return decorator
-
-        def post(self, *_, **__):
-            def decorator(func: Callable) -> Callable:
-                return func
-
-            return decorator
-
-        def patch(self, *_, **__):
-            def decorator(func: Callable) -> Callable:
-                return func
-
-            return decorator
-
-    # In Airflow 3 with AIP-72 we get workload addressed by ExecuteTask
-    # But in Airflow 2.10 it is a command line array
-    ExecuteTask = list[str]  # type: ignore[assignment,misc]
-
-    def parse_command(command: str) -> ExecuteTask:
-        from ast import literal_eval
-
-        return literal_eval(command)
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py
deleted file mode 100644
index a275246d078..00000000000
--- 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py
+++ /dev/null
@@ -1,237 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Compatibility layer for Connexion API to Airflow v2.10 API routes."""
-
-from __future__ import annotations
-
-import json
-import logging
-from typing import TYPE_CHECKING, Any
-from uuid import uuid4
-
-from flask import Response, request
-
-from airflow.exceptions import AirflowException
-from airflow.providers.edge3.worker_api.auth import (
-    jwt_token_authorization,
-    jwt_token_authorization_rpc,
-)
-from airflow.providers.edge3.worker_api.datamodels import (
-    EdgeJobFetched,
-    JsonRpcRequest,
-    PushLogsBody,
-    WorkerQueuesBody,
-    WorkerStateBody,
-)
-from airflow.providers.edge3.worker_api.routes._v2_compat import 
HTTPException, status
-from airflow.providers.edge3.worker_api.routes.jobs import fetch, state as 
state_api
-from airflow.providers.edge3.worker_api.routes.logs import logfile_path, 
push_logs
-from airflow.providers.edge3.worker_api.routes.worker import register, 
set_state
-from airflow.serialization.serialized_objects import BaseSerialization
-from airflow.utils.session import NEW_SESSION, create_session, provide_session
-
-if TYPE_CHECKING:
-    from airflow.api_connexion.types import APIResponse
-    from airflow.utils.state import TaskInstanceState
-
-
-log = logging.getLogger(__name__)
-
-
-def error_response(message: str, status: int):
-    """Log the error and return the response as JSON object."""
-    error_id = uuid4()
-    server_message = f"{message} error_id={error_id}"
-    log.exception(server_message)
-    client_message = f"{message} The server side traceback may be identified 
with error_id={error_id}"
-    return HTTPException(status, client_message)
-
-
-def rpcapi_v2(body: dict[str, Any]) -> APIResponse:
-    """Handle Edge Worker API `/edge_worker/v1/rpcapi` endpoint for Airflow 
2.10."""
-    # Note: Except the method map this _was_ a 100% copy of internal API module
-    #       
airflow.api_internal.endpoints.rpc_api_endpoint.internal_airflow_api()
-    # As of rework for FastAPI in Airflow 3.0, this is updated and to be 
removed in the future.
-    from airflow.api_internal.endpoints.rpc_api_endpoint import (
-        # Note: This is just for compatibility with Airflow 2.10, not working 
for Airflow 3 / main as removed
-        initialize_method_map,
-    )
-
-    try:
-        if request.headers.get("Content-Type", "") != "application/json":
-            raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected 
Content-Type: application/json")
-        if request.headers.get("Accept", "") != "application/json":
-            raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected Accept: 
application/json")
-        auth = request.headers.get("Authorization", "")
-        request_obj = JsonRpcRequest(method=body["method"], 
jsonrpc=body["jsonrpc"], params=body["params"])
-        jwt_token_authorization_rpc(request_obj, auth)
-        if request_obj.jsonrpc != "2.0":
-            raise error_response("Expected jsonrpc 2.0 request.", 
status.HTTP_400_BAD_REQUEST)
-
-        log.debug("Got request for %s", request_obj.method)
-        methods_map = initialize_method_map()
-        if request_obj.method not in methods_map:
-            raise error_response(f"Unrecognized method: 
{request_obj.method}.", status.HTTP_400_BAD_REQUEST)
-
-        handler = methods_map[request_obj.method]
-        params = {}
-        try:
-            if request_obj.params:
-                # Note, this is Airflow 2.10 specific, as it uses Pydantic 
models for serialization
-                params = BaseSerialization.deserialize(request_obj.params, 
use_pydantic_models=True)  # type: ignore[call-arg]
-        except Exception:
-            raise error_response("Error deserializing parameters.", 
status.HTTP_400_BAD_REQUEST)
-
-        log.debug("Calling method %s\nparams: %s", request_obj.method, params)
-        try:
-            # Session must be created there as it may be needed by serializer 
for lazy-loaded fields.
-            with create_session() as session:
-                output = handler(**params, session=session)
-                # Note, this is Airflow 2.10 specific, as it uses Pydantic 
models for serialization
-                output_json = BaseSerialization.serialize(output, 
use_pydantic_models=True)  # type: ignore[call-arg]
-                log.debug(
-                    "Sending response: %s", json.dumps(output_json) if 
output_json is not None else None
-                )
-        # In case of AirflowException or other selective known types, 
transport the exception class back to caller
-        except (KeyError, AttributeError, AirflowException) as e:
-            # Note, this is Airflow 2.10 specific, as it uses Pydantic models 
for serialization
-            output_json = BaseSerialization.serialize(e, 
use_pydantic_models=True)  # type: ignore[call-arg]
-            log.debug(
-                "Sending exception response: %s", json.dumps(output_json) if 
output_json is not None else None
-            )
-        except Exception:
-            raise error_response(
-                f"Error executing method '{request_obj.method}'.", 
status.HTTP_500_INTERNAL_SERVER_ERROR
-            )
-        response = json.dumps(output_json) if output_json is not None else None
-        return Response(response=response, headers={"Content-Type": 
"application/json"})
-    except HTTPException as e:
-        return e.to_response()  # type: ignore[attr-defined]
-
-
-def jwt_token_authorization_v2(method: str, authorization: str):
-    """Proxy for v2 method path handling."""
-    PREFIX = "/edge_worker/v1/"
-    method_path = method[method.find(PREFIX) + len(PREFIX) :] if PREFIX in 
method else method
-    jwt_token_authorization(method_path, authorization)
-
-
-@provide_session
-def register_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) 
-> Any:
-    """Handle Edge Worker API `/edge_worker/v1/worker/{worker_name}` endpoint 
for Airflow 2.10."""
-    try:
-        auth = request.headers.get("Authorization", "")
-        jwt_token_authorization_v2(request.path, auth)
-        request_obj = WorkerStateBody(
-            state=body["state"], jobs_active=0, queues=body["queues"], 
sysinfo=body["sysinfo"]
-        )
-        return register(worker_name, request_obj, session).model_dump()
-    except HTTPException as e:
-        return e.to_response()  # type: ignore[attr-defined]
-
-
-@provide_session
-def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) 
-> Any:
-    """Handle Edge Worker API `/edge_worker/v1/worker/{worker_name}` endpoint 
for Airflow 2.10."""
-    try:
-        auth = request.headers.get("Authorization", "")
-        jwt_token_authorization_v2(request.path, auth)
-        request_obj = WorkerStateBody(
-            state=body["state"],
-            jobs_active=body["jobs_active"],
-            queues=body["queues"],
-            sysinfo=body["sysinfo"],
-            maintenance_comments=body.get("maintenance_comments"),
-        )
-        return set_state(worker_name, request_obj, session).model_dump()
-    except HTTPException as e:
-        return e.to_response()  # type: ignore[attr-defined]
-
-
-@provide_session
-def job_fetch_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) 
-> Any:
-    """Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` 
endpoint for Airflow 2.10."""
-    from flask import request
-
-    try:
-        auth = request.headers.get("Authorization", "")
-        jwt_token_authorization_v2(request.path, auth)
-        queues = body.get("queues")
-        free_concurrency = body.get("free_concurrency", 1)
-        request_obj = WorkerQueuesBody(queues=queues, 
free_concurrency=free_concurrency)
-        job: EdgeJobFetched | None = fetch(worker_name, request_obj, session)
-        return job.model_dump() if job is not None else None
-    except HTTPException as e:
-        return e.to_response()  # type: ignore[attr-defined]
-
-
-@provide_session
-def job_state_v2(
-    dag_id: str,
-    task_id: str,
-    run_id: str,
-    try_number: int,
-    map_index: str,  # Note: Connexion can not have negative numbers in path 
parameters, use string therefore
-    state: TaskInstanceState,
-    session=NEW_SESSION,
-) -> Any:
-    """Handle Edge Worker API 
`/jobs/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}` 
endpoint for Airflow 2.10."""
-    from flask import request
-
-    try:
-        auth = request.headers.get("Authorization", "")
-        jwt_token_authorization_v2(request.path, auth)
-        state_api(dag_id, task_id, run_id, try_number, int(map_index), state, 
session)
-    except HTTPException as e:
-        return e.to_response()  # type: ignore[attr-defined]
-
-
-def logfile_path_v2(
-    dag_id: str,
-    task_id: str,
-    run_id: str,
-    try_number: int,
-    map_index: str,  # Note: Connexion can not have negative numbers in path 
parameters, use string therefore
-) -> str:
-    """Handle Edge Worker API 
`/edge_worker/v1/logs/logfile_path/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}`
 endpoint for Airflow 2.10."""
-    try:
-        auth = request.headers.get("Authorization", "")
-        jwt_token_authorization_v2(request.path, auth)
-        return logfile_path(dag_id, task_id, run_id, try_number, 
int(map_index))
-    except HTTPException as e:
-        return e.to_response()  # type: ignore[attr-defined]
-
-
-def push_logs_v2(
-    dag_id: str,
-    task_id: str,
-    run_id: str,
-    try_number: int,
-    map_index: str,  # Note: Connexion can not have negative numbers in path 
parameters, use string therefore
-    body: dict[str, Any],
-) -> None:
-    """Handle Edge Worker API 
`/edge_worker/v1/logs/push/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}`
 endpoint for Airflow 2.10."""
-    try:
-        auth = request.headers.get("Authorization", "")
-        jwt_token_authorization_v2(request.path, auth)
-        request_obj = PushLogsBody(
-            log_chunk_data=body["log_chunk_data"], 
log_chunk_time=body["log_chunk_time"]
-        )
-        with create_session() as session:
-            push_logs(dag_id, task_id, run_id, try_number, int(map_index), 
request_obj, session)
-    except HTTPException as e:
-        return e.to_response()  # type: ignore[attr-defined]
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py
index 557da25077f..1281ec2f8e0 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py
@@ -17,7 +17,7 @@
 
 from __future__ import annotations
 
-from airflow.providers.edge3.worker_api.routes._v2_compat import AirflowRouter
+from airflow.api_fastapi.common.router import AirflowRouter
 
 health_router = AirflowRouter(tags=["Monitor"])
 
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
index 9f0c282fb70..c39e9dc1814 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
@@ -19,8 +19,13 @@ from __future__ import annotations
 
 from typing import Annotated
 
+from fastapi import Body, Depends, status
 from sqlalchemy import select, update
 
+from airflow.api_fastapi.common.db.common import SessionDep  # noqa: TC001
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.executors.workloads import ExecuteTask
 from airflow.providers.common.compat.sdk import Stats, timezone
 from airflow.providers.edge3.models.edge_job import EdgeJobModel
 from airflow.providers.edge3.worker_api.auth import 
jwt_token_authorization_rest
@@ -29,20 +34,15 @@ from airflow.providers.edge3.worker_api.datamodels import (
     WorkerApiDocs,
     WorkerQueuesBody,
 )
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
-    AirflowRouter,
-    Body,
-    Depends,
-    SessionDep,
-    create_openapi_http_exception_doc,
-    parse_command,
-    status,
-)
 from airflow.utils.state import TaskInstanceState
 
 jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
 
 
+def parse_command(command: str) -> ExecuteTask:
+    return ExecuteTask.model_validate_json(command)
+
+
 @jobs_router.post(
     "/fetch/{worker_name}",
     dependencies=[Depends(jwt_token_authorization_rest)],
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
index 900ae18b012..064808b2b04 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
@@ -21,20 +21,17 @@ from functools import cache
 from pathlib import Path
 from typing import TYPE_CHECKING, Annotated
 
+from fastapi import Body, Depends, status
+
+from airflow.api_fastapi.common.db.common import SessionDep  # noqa: TC001
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.configuration import conf
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
 from airflow.providers.edge3.worker_api.auth import 
jwt_token_authorization_rest
 from airflow.providers.edge3.worker_api.datamodels import PushLogsBody, 
WorkerApiDocs
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
-    AirflowRouter,
-    Body,
-    Depends,
-    SessionDep,
-    create_openapi_http_exception_doc,
-    status,
-)
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.session import NEW_SESSION, provide_session
 
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
index 4e2298c8adb..07054e895b8 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
@@ -20,8 +20,12 @@ from __future__ import annotations
 import json
 from typing import Annotated
 
+from fastapi import Body, Depends, HTTPException, Path, status
 from sqlalchemy import select
 
+from airflow.api_fastapi.common.db.common import SessionDep  # noqa: TC001
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.providers.common.compat.sdk import Stats, timezone
 from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, 
EdgeWorkerState, set_metrics
 from airflow.providers.edge3.worker_api.auth import 
jwt_token_authorization_rest
@@ -31,16 +35,6 @@ from airflow.providers.edge3.worker_api.datamodels import (
     WorkerSetStateReturn,
     WorkerStateBody,
 )
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
-    AirflowRouter,
-    Body,
-    Depends,
-    HTTPException,
-    Path,
-    SessionDep,
-    create_openapi_http_exception_doc,
-    status,
-)
 
 worker_router = AirflowRouter(
     tags=["Worker"],
diff --git 
a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml 
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
similarity index 99%
rename from 
providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
rename to 
providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
index 36e8a526411..a6567af3bcb 100644
--- a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
+++ 
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
@@ -898,8 +898,7 @@ components:
         command:
           $ref: '#/components/schemas/ExecuteTask'
           title: Command
-          description: Command line to use to execute the job in Airflow 2. 
Task definition
-            in Airflow 3
+          description: Command line to use to execute the job in Airflow
         concurrency_slots:
           type: integer
           title: Concurrency Slots
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py 
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 515421b5297..b8d7435a985 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -49,34 +49,29 @@ from airflow.providers.edge3.worker_api.datamodels import (
 from airflow.utils.state import TaskInstanceState
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 pytest.importorskip("pydantic", minversion="2.0.0")
 
 
-MOCK_COMMAND = (
-    {
-        "token": "mock",
-        "ti": {
-            "id": "4d828a62-a417-4936-a7a6-2b3fabacecab",
-            "task_id": "mock",
-            "dag_id": "mock",
-            "run_id": "mock",
-            "try_number": 1,
-            "dag_version_id": "01234567-89ab-cdef-0123-456789abcdef",
-            "pool_slots": 1,
-            "queue": "default",
-            "priority_weight": 1,
-            "start_date": "2023-01-01T00:00:00+00:00",
-            "map_index": -1,
-        },
-        "dag_rel_path": "mock.py",
-        "log_path": "mock.log",
-        "bundle_info": {"name": "hello", "version": "abc"},
-    }
-    if AIRFLOW_V_3_0_PLUS
-    else ["test", "command"]  # Airflow 2.10
-)
+MOCK_COMMAND = {
+    "token": "mock",
+    "ti": {
+        "id": "4d828a62-a417-4936-a7a6-2b3fabacecab",
+        "task_id": "mock",
+        "dag_id": "mock",
+        "run_id": "mock",
+        "try_number": 1,
+        "dag_version_id": "01234567-89ab-cdef-0123-456789abcdef",
+        "pool_slots": 1,
+        "queue": "default",
+        "priority_weight": 1,
+        "start_date": "2023-01-01T00:00:00+00:00",
+        "map_index": -1,
+    },
+    "dag_rel_path": "mock.py",
+    "log_path": "mock.log",
+    "bundle_info": {"name": "hello", "version": "abc"},
+}
 
 
 class _MockPopen(Popen):
@@ -141,10 +136,7 @@ class TestEdgeWorker:
         return test_edgeworker
 
     @patch("airflow.providers.edge3.cli.worker.Process")
-    @patch("airflow.providers.edge3.cli.worker.logs_logfile_path")
-    @patch("airflow.providers.edge3.cli.worker.Popen")
-    def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, 
worker_with_job: EdgeWorker):
-        mock_popen.side_effect = [MagicMock()]
+    def test_launch_job(self, mock_process, worker_with_job: EdgeWorker):
         mock_process_instance = MagicMock()
         mock_process.side_effect = [mock_process_instance]
 
@@ -152,17 +144,12 @@ class TestEdgeWorker:
         with conf_vars({("edge", "api_url"): 
"https://invalid-api-test-endpoint"}):
             worker_with_job._launch_job(edge_job)
 
-        if AIRFLOW_V_3_0_PLUS:
-            assert mock_process.call_count == 1
-            assert mock_process_instance.start.call_count == 1
-        else:
-            assert mock_popen.call_count == 1
-            assert mock_logfile_path.call_count == 1
+        assert mock_process.call_count == 1
+        assert mock_process_instance.start.call_count == 1
 
         assert len(EdgeWorker.jobs) == 1
         assert EdgeWorker.jobs[0].edge_job == edge_job
 
-    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 
3+")
     @pytest.mark.parametrize(
         ("configs", "expected_url"),
         [
@@ -193,18 +180,14 @@ class TestEdgeWorker:
             url = EdgeWorker._execution_api_server_url()
             assert url == expected_url
 
-    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 
3+")
     @patch("airflow.sdk.execution_time.supervisor.supervise")
     @patch("airflow.providers.edge3.cli.worker.Process")
-    @patch("airflow.providers.edge3.cli.worker.Popen")
     def test_supervise_launch(
         self,
-        mock_popen,
         mock_process,
         mock_supervise,
         worker_with_job: EdgeWorker,
     ):
-        mock_popen.side_effect = [MagicMock()]
         mock_process_instance = MagicMock()
         mock_process.side_effect = [mock_process_instance]
 
@@ -237,14 +220,12 @@ class TestEdgeWorker:
         ],
     )
     @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
-    @patch("airflow.providers.edge3.cli.worker.logs_logfile_path")
     @patch("airflow.providers.edge3.cli.worker.jobs_set_state")
     @patch("subprocess.Popen")
     def test_fetch_job(
         self,
         mock_popen,
         mock_set_state,
-        mock_logfile_path,
         mock_reserve_task,
         reserve_result,
         fetch_result,
@@ -258,11 +239,6 @@ class TestEdgeWorker:
             got_job = worker_with_job.fetch_job()
         mock_reserve_task.assert_called_once()
         assert got_job == fetch_result
-        if AIRFLOW_V_3_0_PLUS:
-            # this is only called on Airflow 2.10, AIP-72 includes it
-            assert mock_logfile_path.call_count == 0
-        else:
-            assert mock_logfile_path.call_count == logfile_path_call_count
         assert mock_set_state.call_count == set_state_call_count
 
     def test_check_running_jobs_running(self, worker_with_job: EdgeWorker):
diff --git a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py 
b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py
index c49f03ead5e..b8f411ebd0e 100644
--- a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py
+++ b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-from copy import deepcopy
 from datetime import datetime, timedelta
 from unittest.mock import MagicMock, patch
 
@@ -33,7 +32,6 @@ from airflow.utils.session import create_session
 from airflow.utils.state import TaskInstanceState
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 pytestmark = pytest.mark.db_test
 
@@ -58,43 +56,12 @@ class TestEdgeExecutor:
 
         return (executor, key)
 
-    @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="_process_tasks is not used 
in Airflow 3.0+")
-    def test__process_tasks_bad_command(self):
-        executor, key = self.get_test_executor()
-        task_tuple = (key, ["hello", "world"], None, None)
-        with pytest.raises(ValueError, match="The command must start with "):
-            executor._process_tasks([task_tuple])
-
-    @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="_process_tasks is not used 
in Airflow 3.0+")
-    @pytest.mark.parametrize(
-        ("pool_slots", "expected_concurrency"),
-        [
-            pytest.param(1, 1, id="default_pool_size"),
-            pytest.param(5, 5, id="increased_pool_size"),
-        ],
-    )
-    def test__process_tasks_ok_command(self, pool_slots, expected_concurrency):
-        executor, key = self.get_test_executor(pool_slots=pool_slots)
-        task_tuple = (key, ["airflow", "tasks", "run", "hello", "world"], 
None, None)
-        executor._process_tasks([task_tuple])
-
-        with create_session() as session:
-            jobs: list[EdgeJobModel] = session.query(EdgeJobModel).all()
-        assert len(jobs) == 1
-        assert jobs[0].dag_id == "test_dag"
-        assert jobs[0].run_id == "test_run"
-        assert jobs[0].task_id == "test_task"
-        assert jobs[0].concurrency_slots == expected_concurrency
-
     @patch(f"{Stats.__module__}.Stats.incr")
     def test_sync_orphaned_tasks(self, mock_stats_incr):
         executor = EdgeExecutor()
 
         delta_to_purge = timedelta(minutes=conf.getint("edge", 
"job_fail_purge") + 1)
-        if AIRFLOW_V_3_0_PLUS:
-            delta_to_orphaned_config_name = "task_instance_heartbeat_timeout"
-        else:
-            delta_to_orphaned_config_name = "scheduler_zombie_task_threshold"
+        delta_to_orphaned_config_name = "task_instance_heartbeat_timeout"
 
         delta_to_orphaned = timedelta(seconds=conf.getint("scheduler", 
delta_to_orphaned_config_name) + 1)
 
@@ -297,145 +264,6 @@ class TestEdgeExecutor:
                 else:
                     assert worker.state == EdgeWorkerState.IDLE
 
-    @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="API only available in 
Airflow <3.0")
-    def test_execute_async(self):
-        executor, key = self.get_test_executor()
-
-        # Need to apply "trick" which is used to pass pool_slots
-        executor.edge_queued_tasks = deepcopy(executor.queued_tasks)
-
-        executor.execute_async(key=key, command=["airflow", "tasks", "run", 
"hello", "world"])
-
-        with create_session() as session:
-            jobs = session.query(EdgeJobModel).all()
-            assert len(jobs) == 1
-
-    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="API only available in 
Airflow 3.0+")
-    def test_queue_workload(self):
-        from airflow.executors.workloads import ExecuteTask, TaskInstance
-
-        executor = self.get_test_executor()[0]
-
-        with pytest.raises(TypeError):
-            # Does not like the Airflow 2.10 type of workload
-            executor.queue_workload(command=["airflow", "tasks", "run", 
"hello", "world"])
-
-        workload = ExecuteTask(
-            token="mock",
-            ti=TaskInstance(
-                id="4d828a62-a417-4936-a7a6-2b3fabacecab",
-                task_id="mock",
-                dag_id="mock",
-                run_id="mock",
-                try_number=1,
-                pool_slots=1,
-                queue="default",
-                priority_weight=1,
-                start_date=timezone.utcnow(),
-                dag_version_id="4d828a62-a417-4936-a7a6-2b3fabacecab",
-            ),
-            dag_rel_path="mock.py",
-            log_path="mock.log",
-            bundle_info={"name": "n/a", "version": "no matter"},
-        )
-        executor.queue_workload(workload=workload)
-
-        with create_session() as session:
-            jobs = session.query(EdgeJobModel).all()
-            assert len(jobs) == 1
-
-    @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="API only available in 
Airflow <3.0")
-    def test_execute_async_updates_existing_job(self):
-        executor, key = self.get_test_executor()
-
-        # First insert a job with the same key
-        with create_session() as session:
-            session.add(
-                EdgeJobModel(
-                    dag_id=key.dag_id,
-                    run_id=key.run_id,
-                    task_id=key.task_id,
-                    map_index=key.map_index,
-                    try_number=key.try_number,
-                    state=TaskInstanceState.SCHEDULED,
-                    queue="default",
-                    concurrency_slots=1,
-                    command="old-command",
-                    last_update=timezone.utcnow(),
-                )
-            )
-            session.commit()
-
-        # Trigger execute_async which should update the existing job
-        executor.edge_queued_tasks = deepcopy(executor.queued_tasks)
-        executor.execute_async(key=key, command=["airflow", "tasks", "run", 
"new", "command"])
-
-        with create_session() as session:
-            jobs = session.query(EdgeJobModel).all()
-            assert len(jobs) == 1
-            job = jobs[0]
-            assert job.state == TaskInstanceState.QUEUED
-            assert job.command != "old-command"
-            assert "new" in job.command
-
-    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="API only available in 
Airflow 3.0+")
-    def test_queue_workload_updates_existing_job(self):
-        from uuid import uuid4
-
-        from airflow.executors.workloads import ExecuteTask, TaskInstance
-
-        executor = self.get_test_executor()[0]
-
-        key = TaskInstanceKey(dag_id="mock", run_id="mock", task_id="mock", 
map_index=-1, try_number=1)
-
-        # Insert an existing job
-        with create_session() as session:
-            session.add(
-                EdgeJobModel(
-                    dag_id=key.dag_id,
-                    task_id=key.task_id,
-                    run_id=key.run_id,
-                    map_index=key.map_index,
-                    try_number=key.try_number,
-                    state=TaskInstanceState.SCHEDULED,
-                    queue="default",
-                    command="old-command",
-                    concurrency_slots=1,
-                    last_update=timezone.utcnow(),
-                )
-            )
-            session.commit()
-
-        # Queue a workload with same key
-        workload = ExecuteTask(
-            token="mock",
-            ti=TaskInstance(
-                id=uuid4(),
-                task_id=key.task_id,
-                dag_id=key.dag_id,
-                run_id=key.run_id,
-                try_number=key.try_number,
-                map_index=key.map_index,
-                pool_slots=1,
-                queue="updated-queue",
-                priority_weight=1,
-                start_date=timezone.utcnow(),
-                dag_version_id=uuid4(),
-            ),
-            dag_rel_path="mock.py",
-            log_path="mock.log",
-            bundle_info={"name": "n/a", "version": "no matter"},
-        )
-
-        executor.queue_workload(workload=workload)
-
-        with create_session() as session:
-            jobs = session.query(EdgeJobModel).all()
-            assert len(jobs) == 1
-            job = jobs[0]
-            assert job.queue == "updated-queue"
-            assert job.command != "old-command"
-
     def test_revoke_task(self):
         """Test that revoke_task removes task from executor and database."""
         executor = EdgeExecutor()
diff --git 
a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py 
b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
index 21729bb4900..0eea2ae9c0d 100644
--- a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
+++ b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
@@ -20,13 +20,11 @@ import importlib
 from unittest.mock import patch
 
 import pytest
-import time_machine
 
 from airflow.plugins_manager import AirflowPlugin
 from airflow.providers.edge3.plugins import edge_executor_plugin
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 
 def test_plugin_inactive():
@@ -46,7 +44,7 @@ def test_plugin_inactive():
 
 @pytest.mark.db_test
 def test_plugin_active_apiserver():
-    mock_cli = ["airflow", "api-server"] if AIRFLOW_V_3_0_PLUS else 
["gunicorn", "airflow-webserver"]
+    mock_cli = ["airflow", "api-server"]
     with conf_vars({("edge", "api_enabled"): "true"}), patch("sys.argv", 
mock_cli):
         importlib.reload(edge_executor_plugin)
 
@@ -59,13 +57,9 @@ def test_plugin_active_apiserver():
         rep = EdgeExecutorPlugin()
         assert EDGE_EXECUTOR_ACTIVE
         assert RUNNING_ON_APISERVER
-        if AIRFLOW_V_3_0_PLUS:
-            assert len(rep.appbuilder_views) == 0
-            assert len(rep.flask_blueprints) == 0
-            assert len(rep.fastapi_apps) == 1
-        else:
-            assert len(rep.appbuilder_views) == 2
-            assert len(rep.flask_blueprints) == 2
+        assert len(rep.appbuilder_views) == 0
+        assert len(rep.flask_blueprints) == 0
+        assert len(rep.fastapi_apps) == 1
 
 
 @patch("sys.argv", ["airflow", "some-other-command"])
@@ -85,8 +79,7 @@ def test_plugin_active_non_apiserver():
         assert len(rep.appbuilder_views) == 0
         assert len(rep.flask_blueprints) == 0
         assert len(rep.appbuilder_views) == 0
-        if AIRFLOW_V_3_0_PLUS:
-            assert len(rep.fastapi_apps) == 0
+        assert len(rep.fastapi_apps) == 0
 
 
 @pytest.fixture
@@ -98,34 +91,3 @@ def plugin():
 
 def test_plugin_is_airflow_plugin(plugin):
     assert isinstance(plugin, AirflowPlugin)
-
-
[email protected](AIRFLOW_V_3_0_PLUS, reason="Plugin endpoint is not used in 
Airflow 3.0+")
[email protected](
-    ("initial_comment", "expected_comment"),
-    [
-        pytest.param(
-            "comment", "[2020-01-01 00:00] - user updated maintenance 
mode\nComment: comment", id="no user"
-        ),
-        pytest.param(
-            "[2019-01-01] - another user put node into maintenance 
mode\nComment:new comment",
-            "[2020-01-01 00:00] - user updated maintenance mode\nComment:new 
comment",
-            id="first update",
-        ),
-        pytest.param(
-            "[2019-01-01] - another user updated maintenance mode\nComment:new 
comment",
-            "[2020-01-01 00:00] - user updated maintenance mode\nComment:new 
comment",
-            id="second update",
-        ),
-        pytest.param(
-            None,
-            "[2020-01-01 00:00] - user updated maintenance mode\nComment:",
-            id="None as input",
-        ),
-    ],
-)
-@time_machine.travel("2020-01-01", tick=False)
-def test_modify_maintenance_comment_on_update(monkeypatch, initial_comment, 
expected_comment):
-    assert (
-        
edge_executor_plugin.modify_maintenance_comment_on_update(initial_comment, 
"user") == expected_comment
-    )
diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py 
b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
index 11e0cb25561..5f1b3b6db93 100644
--- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
@@ -20,12 +20,12 @@ from pathlib import Path
 from typing import TYPE_CHECKING
 
 import pytest
+from fastapi import HTTPException
 
 from airflow.providers.common.compat.sdk import timezone
 from airflow.providers.edge3.cli.worker import EdgeWorker
 from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, 
EdgeWorkerState
 from airflow.providers.edge3.worker_api.datamodels import 
WorkerQueueUpdateBody, WorkerStateBody
-from airflow.providers.edge3.worker_api.routes._v2_compat import HTTPException
 from airflow.providers.edge3.worker_api.routes.worker import (
     _assert_version,
     register,
diff --git a/providers/edge3/www-hash.txt b/providers/edge3/www-hash.txt
index 5f9f2b64f5a..d71f388e496 100644
--- a/providers/edge3/www-hash.txt
+++ b/providers/edge3/www-hash.txt
@@ -1 +1 @@
-29cb09d5e6ea8dde292b43013f0d2dbbe2180687b1e9d766090b0d3f135931c9
+c8817a364febb85bea2a80df205b2b8075caf07101cfb971330dc559c4b160bf
diff --git a/scripts/in_container/run_generate_openapi_spec_providers.py 
b/scripts/in_container/run_generate_openapi_spec_providers.py
index 99969bf40c9..97b1c84c674 100755
--- a/scripts/in_container/run_generate_openapi_spec_providers.py
+++ b/scripts/in_container/run_generate_openapi_spec_providers.py
@@ -54,7 +54,7 @@ PROVIDERS_DEFS = {
         prefix="/auth",
     ),
     "edge": ProviderDef(
-        openapi_spec_file=Path(EDGE_PATH).parent / "openapi" / 
"v2-edge-generated.yaml",
+        openapi_spec_file=Path(EDGE_PATH).parent / "worker_api" / 
"v2-edge-generated.yaml",
         app=create_edge_worker_api_app(),
         prefix="/edge_worker",
     ),

Reply via email to