This is an automated email from the ASF dual-hosted git repository.

turaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fcb2eb64704 Introduce EdgeDBManager: Independent Provider Specific 
Database Schema Management (#61155)
fcb2eb64704 is described below

commit fcb2eb64704b66244975ec84d585f675aba24464
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Wed Feb 18 21:17:16 2026 -0800

    Introduce EdgeDBManager: Independent Provider Specific Database Schema 
Management (#61155)
    
    * Add EdgeDBManager for provider-specific database migrations
    
      Implement EdgeDBManager to integrate Edge3 provider with Airflow's
      external database manager system, enabling independent schema version
      control for Edge3 tables separate from core Airflow migrations.
    
      This enables Edge3 provider to manage its database schema evolution
      independently from core Airflow, allowing for provider-specific version
      control and migration management. The infrastructure is ready for use
      once initial migration files are generated and the legacy 
_check_db_schema()
      approach in EdgeExecutor.start() is removed.
    
    * Add table creation support to EdgeDBManager via Alembic migration
    
      Wire up the full initdb/create_db_from_orm lifecycle in EdgeDBManager
      so table creation is handled through proper Alembic migrations instead
      of ad-hoc metadata.create_all() in EdgeExecutor.start().
    
      - Create initial Alembic migration (0001_3_0_0) for edge_worker,
        edge_job, and edge_logs tables with if_not_exists=True
      - Replace _check_db_schema() + metadata.create_all() in
        EdgeExecutor.start() with EdgeDBManager.initdb()
      - Remove legacy _check_db_schema() (Airflow 2.x workaround)
      - Add edge3 support to check_revision_heads_map.py pre-commit script
      - Add check-revision-heads-map-edge3 pre-commit hook
      - Rename migrations README to README.md
      - Add tests for create_db_from_orm, initdb, and revision_heads_map
    
    * Add missing test file for edge3 migrations env module
    
      The test_project_structure check requires every provider source module
      to have a corresponding test file. Add test_env.py for the migrations
      env.py module with tests for version table name and metadata contents.
    
    * Skip DB lock acquisition when edge3 migrations are already current
    
      Add a fast-path check_migration() call before acquiring the global
      advisory lock in EdgeExecutor.start(). This avoids unnecessary lock
      contention when multiple API server instances start simultaneously
      and the database is already at the target migration state.
    
    * Add session-scoped fixture to create edge3 tables for tests
    
      Since edge3 tables are managed via separate _edge_metadata (removed
      from Base.metadata), the test framework's initdb no longer creates
      them. Add a session-scoped autouse fixture in conftest.py that creates
      the tables once for all edge3 tests.
    
    * Fix edge3 test failures
    
      - Guard _create_edge_tables conftest fixture against settings.engine
        being None when non-DB tests (CLI) run in the same session
      - Patch _check_valid_db_connection in test_list_edge_workers since
        the test env sql_alchemy_conn equals the default value
    
    * Jens suggestions
---
 providers/edge3/.pre-commit-config.yaml            |  10 +
 .../edge3/src/airflow/providers/edge3/alembic.ini  | 133 ++++++++++++
 .../providers/edge3/executors/edge_executor.py     |  59 +----
 .../airflow/providers/edge3/migrations/README.md   |  20 ++
 .../providers/edge3/migrations/__init__.py}        |   3 -
 .../src/airflow/providers/edge3/migrations/env.py  | 126 +++++++++++
 .../providers/edge3/migrations/script.py.mako}     |  30 ++-
 .../versions/0001_3_0_0_create_edge_tables.py      |  89 ++++++++
 .../edge3/migrations/versions/__init__.py}         |   3 -
 .../edge3/src/airflow/providers/edge3/models/db.py |  88 ++++++++
 providers/edge3/tests/conftest.py                  |  18 ++
 .../edge3/tests/unit/edge3/cli/test_worker.py      |  11 +-
 .../edge3/migrations/__init__.py}                  |   3 -
 .../edge3/migrations/test_env.py}                  |  16 +-
 providers/edge3/tests/unit/edge3/models/test_db.py | 241 +++++++++++++++++++++
 scripts/ci/prek/check_revision_heads_map.py        |  12 +-
 16 files changed, 795 insertions(+), 67 deletions(-)

diff --git a/providers/edge3/.pre-commit-config.yaml 
b/providers/edge3/.pre-commit-config.yaml
index fc6d271e994..f89d09c5d3a 100644
--- a/providers/edge3/.pre-commit-config.yaml
+++ b/providers/edge3/.pre-commit-config.yaml
@@ -46,6 +46,16 @@ repos:
         additional_dependencies: ['[email protected]']
         pass_filenames: true
         require_serial: true
+      - id: check-revision-heads-map-edge3
+        name: Check that the REVISION_HEADS_MAP is up-to-date
+        language: python
+        entry: ../../scripts/ci/prek/check_revision_heads_map.py
+        pass_filenames: false
+        files: >
+          (?x)
+          ^src/airflow/providers/edge3/migrations/versions/.*$|
+          ^src/airflow/providers/edge3/migrations/versions|
+          ^src/airflow/providers/edge3/models/db\.py$
       - id: compile-edge-assets
         name: Compile Edge provider assets
         language: node
diff --git a/providers/edge3/src/airflow/providers/edge3/alembic.ini 
b/providers/edge3/src/airflow/providers/edge3/alembic.ini
new file mode 100644
index 00000000000..75d42ee16d3
--- /dev/null
+++ b/providers/edge3/src/airflow/providers/edge3/alembic.ini
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# A generic, single database configuration.
+
+[alembic]
+# path to migration scripts
+# Use forward slashes (/) also on windows to provide an os agnostic path
+script_location = %(here)s/migrations
+
+# template used to generate migration file names; The default value is 
%%(rev)s_%%(slug)s
+# Uncomment the line below if you want the files to be prepended with date and 
time
+# see 
https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
+# for all available tokens
+# file_template = 
%%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
+
+# sys.path path, will be prepended to sys.path if present.
+# defaults to the current working directory.
+prepend_sys_path = .
+
+# timezone to use when rendering the date within the migration file
+# as well as the filename.
+# If specified, requires the python>=3.9 or backports.zoneinfo library.
+# Any required deps can installed by adding `alembic[tz]` to the pip 
requirements
+# string value is passed to ZoneInfo()
+# leave blank for localtime
+# timezone =
+
+# max length of characters to apply to the "slug" field
+# truncate_slug_length = 40
+
+# set to 'true' to run the environment during
+# the 'revision' command, regardless of autogenerate
+# revision_environment = false
+
+# set to 'true' to allow .pyc and .pyo files without
+# a source .py file to be detected as revisions in the
+# versions/ directory
+# sourceless = false
+
+# version location specification; This defaults
+# to alembic/versions.  When using multiple version
+# directories, initial revisions must be specified with --version-path.
+# The path separator used here should be the separator specified by 
"version_path_separator" below.
+# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
+
+# version path separator; As mentioned above, this is the character used to 
split
+# version_locations. The default within new alembic.ini files is "os", which 
uses os.pathsep.
+# If this key is omitted entirely, it falls back to the legacy behavior of 
splitting on spaces and/or commas.
+# Valid values for version_path_separator are:
+#
+# version_path_separator = :
+# version_path_separator = ;
+# version_path_separator = space
+version_path_separator = os  # Use os.pathsep. Default configuration used for 
new projects.
+
+# set to 'true' to search source files recursively
+# in each "version_locations" directory
+# new in Alembic version 1.10
+# recursive_version_locations = false
+
+# the output encoding used when revision files
+# are written from script.py.mako
+# output_encoding = utf-8
+
+sqlalchemy.url = scheme://localhost/airflow
+
+
+[post_write_hooks]
+# post_write_hooks defines scripts or Python functions that are run
+# on newly generated revision scripts.  See the documentation for further
+# detail and examples
+
+# format using "black" - use the console_scripts runner, against the "black" 
entrypoint
+# hooks = black
+# black.type = console_scripts
+# black.entrypoint = black
+# black.options = -l 79 REVISION_SCRIPT_FILENAME
+
+# lint with attempts to fix using "ruff" - use the exec runner, execute a 
binary
+# hooks = ruff
+# ruff.type = exec
+# ruff.executable = %(here)s/.venv/bin/ruff
+# ruff.options = --fix REVISION_SCRIPT_FILENAME
+
+# Logging configuration
+[loggers]
+keys = root,sqlalchemy,alembic
+
+[handlers]
+keys = console
+
+[formatters]
+keys = generic
+
+[logger_root]
+level = WARN
+handlers = console
+qualname =
+
+[logger_sqlalchemy]
+level = WARN
+handlers =
+qualname = sqlalchemy.engine
+
+[logger_alembic]
+level = INFO
+handlers =
+qualname = alembic
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatter_generic]
+format = %(levelname)-5.5s [%(name)s] %(message)s
+datefmt = %H:%M:%S
diff --git 
a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py 
b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index 60392062469..f4a8e4b99e8 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -17,21 +17,19 @@
 
 from __future__ import annotations
 
-import contextlib
 from collections.abc import Sequence
 from copy import deepcopy
 from datetime import datetime, timedelta
 from typing import TYPE_CHECKING, Any
 
-from sqlalchemy import delete, inspect, select, text
-from sqlalchemy.exc import NoSuchTableError
-from sqlalchemy.orm import Session
+from sqlalchemy import delete, select
 
 from airflow.configuration import conf
 from airflow.executors import workloads
 from airflow.executors.base_executor import BaseExecutor
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.common.compat.sdk import Stats, timezone
+from airflow.providers.edge3.models.db import EdgeDBManager
 from airflow.providers.edge3.models.edge_job import EdgeJobModel
 from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
 from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, 
EdgeWorkerState, reset_metrics
@@ -40,7 +38,7 @@ from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
-    from sqlalchemy.engine.base import Engine
+    from sqlalchemy.orm import Session
 
     from airflow.cli.cli_config import GroupCommand
     from airflow.models.taskinstancekey import TaskInstanceKey
@@ -61,56 +59,15 @@ class EdgeExecutor(BaseExecutor):
         super().__init__(parallelism=parallelism)
         self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {}
 
-    def _check_db_schema(self, engine: Engine) -> None:
-        """
-        Check if already existing table matches the newest table schema.
-
-        workaround as Airflow 2.x had no support for provider DB migrations,
-        then it is possible to use alembic also for provider distributions.
-
-        TODO(jscheffl): Change to alembic DB migrations in the future.
-        """
-        inspector = inspect(engine)
-        edge_job_columns = None
-        edge_job_command_len = None
-        with contextlib.suppress(NoSuchTableError):
-            edge_job_schema = inspector.get_columns("edge_job")
-            edge_job_columns = [column["name"] for column in edge_job_schema]
-            for column in edge_job_schema:
-                if column["name"] == "command":
-                    edge_job_command_len = column["type"].length  # type: 
ignore[attr-defined]
-
-        # version 0.6.0rc1 added new column concurrency_slots
-        if edge_job_columns and "concurrency_slots" not in edge_job_columns:
-            EdgeJobModel.metadata.drop_all(engine, 
tables=[EdgeJobModel.__table__])
-
-        # version 1.1.0 the command column was changed to VARCHAR(2048)
-        elif edge_job_command_len and edge_job_command_len != 2048:
-            with Session(engine) as session:
-                query = "ALTER TABLE edge_job ALTER COLUMN command TYPE 
VARCHAR(2048);"
-                session.execute(text(query))
-                session.commit()
-
-        edge_worker_columns = None
-        with contextlib.suppress(NoSuchTableError):
-            edge_worker_columns = [column["name"] for column in 
inspector.get_columns("edge_worker")]
-
-        # version 0.14.0pre0 added new column maintenance_comment
-        if edge_worker_columns and "maintenance_comment" not in 
edge_worker_columns:
-            with Session(engine) as session:
-                query = "ALTER TABLE edge_worker ADD maintenance_comment 
VARCHAR(1024);"
-                session.execute(text(query))
-                session.commit()
-
     @provide_session
     def start(self, session: Session = NEW_SESSION):
         """If EdgeExecutor provider is loaded first time, ensure table 
exists."""
+        edge_db_manager = EdgeDBManager(session)
+        if edge_db_manager.check_migration():
+            return
+
         with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
-            engine = session.get_bind().engine
-            self._check_db_schema(engine)
-            EdgeJobModel.metadata.create_all(engine)
-            EdgeLogsModel.metadata.create_all(engine)
-            EdgeWorkerModel.metadata.create_all(engine)
+            edge_db_manager.initdb()
 
     def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
         """
diff --git a/providers/edge3/src/airflow/providers/edge3/migrations/README.md 
b/providers/edge3/src/airflow/providers/edge3/migrations/README.md
new file mode 100644
index 00000000000..0c74b31e9ac
--- /dev/null
+++ b/providers/edge3/src/airflow/providers/edge3/migrations/README.md
@@ -0,0 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+Edge3 provider database migrations using Alembic.
diff --git a/providers/edge3/tests/conftest.py 
b/providers/edge3/src/airflow/providers/edge3/migrations/__init__.py
similarity index 90%
copy from providers/edge3/tests/conftest.py
copy to providers/edge3/src/airflow/providers/edge3/migrations/__init__.py
index f56ccce0a3f..13a83393a91 100644
--- a/providers/edge3/tests/conftest.py
+++ b/providers/edge3/src/airflow/providers/edge3/migrations/__init__.py
@@ -14,6 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-pytest_plugins = "tests_common.pytest_plugin"
diff --git a/providers/edge3/src/airflow/providers/edge3/migrations/env.py 
b/providers/edge3/src/airflow/providers/edge3/migrations/env.py
new file mode 100644
index 00000000000..6d1af6bb408
--- /dev/null
+++ b/providers/edge3/src/airflow/providers/edge3/migrations/env.py
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import contextlib
+from logging import getLogger
+from logging.config import fileConfig
+
+from alembic import context
+
+from airflow import settings
+from airflow.providers.edge3.models.db import EdgeDBManager
+
+# this is the Alembic Config object, which provides
+# access to the values within the .ini file in use.
+config = context.config
+
+version_table = EdgeDBManager.version_table_name
+
+# Interpret the config file for Python logging.
+# This line sets up loggers basically.
+if not getLogger().handlers and config.config_file_name:
+    fileConfig(config.config_file_name, disable_existing_loggers=False)
+
+# add your model's MetaData object here
+# for 'autogenerate' support
+# from myapp import mymodel
+# target_metadata = mymodel.Base.metadata
+target_metadata = EdgeDBManager.metadata
+
+# other values from the config, defined by the needs of env.py,
+# can be acquired:
+# my_important_option = config.get_main_option("my_important_option")
+# ... etc.
+
+
+def include_object(_, name, type_, *args):
+    if type_ == "table" and name not in target_metadata.tables:
+        return False
+    return True
+
+
+def run_migrations_offline():
+    """
+    Run migrations in 'offline' mode.
+
+    This configures the context with just a URL
+    and not an Engine, though an Engine is acceptable
+    here as well.  By skipping the Engine creation
+    we don't even need a DBAPI to be available.
+
+    Calls to context.execute() here emit the given string to the
+    script output.
+
+    """
+    context.configure(
+        url=settings.SQL_ALCHEMY_CONN,
+        target_metadata=target_metadata,
+        literal_binds=True,
+        compare_type=True,
+        compare_server_default=True,
+        render_as_batch=True,
+        version_table=version_table,
+        include_object=include_object,
+    )
+
+    with context.begin_transaction():
+        context.run_migrations()
+
+
+def run_migrations_online():
+    """
+    Run migrations in 'online' mode.
+
+    In this scenario we need to create an Engine
+    and associate a connection with the context.
+
+    """
+
+    def process_revision_directives(context, revision, directives):
+        if getattr(config.cmd_opts, "autogenerate", False):
+            script = directives[0]
+            if script.upgrade_ops and script.upgrade_ops.is_empty():
+                directives[:] = []
+                print("No change detected in ORM schema, skipping revision.")
+
+    with contextlib.ExitStack() as stack:
+        connection = config.attributes.get("connection", None)
+
+        if not connection:
+            connection = stack.push(settings.engine.connect())
+
+        context.configure(
+            connection=connection,
+            transaction_per_migration=True,
+            target_metadata=target_metadata,
+            compare_type=True,
+            compare_server_default=True,
+            include_object=include_object,
+            render_as_batch=True,
+            process_revision_directives=process_revision_directives,
+            version_table=version_table,
+        )
+
+        with context.begin_transaction():
+            context.run_migrations()
+
+
+if context.is_offline_mode():
+    run_migrations_offline()
+else:
+    run_migrations_online()
diff --git a/providers/edge3/tests/conftest.py 
b/providers/edge3/src/airflow/providers/edge3/migrations/script.py.mako
similarity index 58%
copy from providers/edge3/tests/conftest.py
copy to providers/edge3/src/airflow/providers/edge3/migrations/script.py.mako
index f56ccce0a3f..75d7a1b92ef 100644
--- a/providers/edge3/tests/conftest.py
+++ b/providers/edge3/src/airflow/providers/edge3/migrations/script.py.mako
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,6 +15,31 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
 
-pytest_plugins = "tests_common.pytest_plugin"
+"""${message}
+
+Revision ID: ${up_revision}
+Revises: ${down_revision | comma,n}
+Create Date: ${create_date}
+
+"""
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+${imports if imports else ""}
+
+# revision identifiers, used by Alembic.
+revision = ${repr(up_revision)}
+down_revision = ${repr(down_revision)}
+branch_labels = ${repr(branch_labels)}
+depends_on = ${repr(depends_on)}
+edge3_version = None
+
+
+def upgrade() -> None:
+    ${upgrades if upgrades else "pass"}
+
+
+def downgrade() -> None:
+    ${downgrades if downgrades else "pass"}
diff --git 
a/providers/edge3/src/airflow/providers/edge3/migrations/versions/0001_3_0_0_create_edge_tables.py
 
b/providers/edge3/src/airflow/providers/edge3/migrations/versions/0001_3_0_0_create_edge_tables.py
new file mode 100644
index 00000000000..53f4ab7a2eb
--- /dev/null
+++ 
b/providers/edge3/src/airflow/providers/edge3/migrations/versions/0001_3_0_0_create_edge_tables.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Create Edge tables if missing.
+
+Revision ID: 9d34dfc2de06
+Revises:
+Create Date: 2025-06-01 00:00:00.000000
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "9d34dfc2de06"
+down_revision = None
+branch_labels = None
+depends_on = None
+edge3_version = "3.0.0"
+
+
+def upgrade() -> None:
+    op.create_table(
+        "edge_worker",
+        sa.Column("worker_name", sa.String(length=64), nullable=False),
+        sa.Column("state", sa.String(length=20), nullable=False),
+        sa.Column("maintenance_comment", sa.String(length=1024), 
nullable=True),
+        sa.Column("queues", sa.String(length=256), nullable=True),
+        sa.Column("first_online", sa.DateTime(), nullable=True),
+        sa.Column("last_update", sa.DateTime(), nullable=True),
+        sa.Column("jobs_active", sa.Integer(), nullable=False),
+        sa.Column("jobs_taken", sa.Integer(), nullable=False),
+        sa.Column("jobs_success", sa.Integer(), nullable=False),
+        sa.Column("jobs_failed", sa.Integer(), nullable=False),
+        sa.Column("sysinfo", sa.String(length=256), nullable=True),
+        sa.PrimaryKeyConstraint("worker_name"),
+        if_not_exists=True,
+    )
+    op.create_table(
+        "edge_job",
+        sa.Column("dag_id", sa.String(length=250), nullable=False),
+        sa.Column("task_id", sa.String(length=250), nullable=False),
+        sa.Column("run_id", sa.String(length=250), nullable=False),
+        sa.Column("map_index", sa.Integer(), server_default=sa.text("-1"), 
nullable=False),
+        sa.Column("try_number", sa.Integer(), nullable=False),
+        sa.Column("state", sa.String(length=20), nullable=False),
+        sa.Column("queue", sa.String(length=256), nullable=False),
+        sa.Column("concurrency_slots", sa.Integer(), nullable=False),
+        sa.Column("command", sa.String(length=2048), nullable=False),
+        sa.Column("queued_dttm", sa.DateTime(), nullable=True),
+        sa.Column("edge_worker", sa.String(length=64), nullable=True),
+        sa.Column("last_update", sa.DateTime(), nullable=True),
+        sa.PrimaryKeyConstraint("dag_id", "task_id", "run_id", "map_index", 
"try_number"),
+        if_not_exists=True,
+    )
+    op.create_index("rj_order", "edge_job", ["state", "queued_dttm", "queue"], 
if_not_exists=True)
+    op.create_table(
+        "edge_logs",
+        sa.Column("dag_id", sa.String(length=250), nullable=False),
+        sa.Column("task_id", sa.String(length=250), nullable=False),
+        sa.Column("run_id", sa.String(length=250), nullable=False),
+        sa.Column("map_index", sa.Integer(), server_default=sa.text("-1"), 
nullable=False),
+        sa.Column("try_number", sa.Integer(), nullable=False),
+        sa.Column("log_chunk_time", sa.DateTime(), nullable=False),
+        sa.Column("log_chunk_data", sa.Text(), nullable=False),
+        sa.PrimaryKeyConstraint("dag_id", "task_id", "run_id", "map_index", 
"try_number", "log_chunk_time"),
+        if_not_exists=True,
+    )
+
+
+def downgrade() -> None: ...
diff --git a/providers/edge3/tests/conftest.py 
b/providers/edge3/src/airflow/providers/edge3/migrations/versions/__init__.py
similarity index 90%
copy from providers/edge3/tests/conftest.py
copy to 
providers/edge3/src/airflow/providers/edge3/migrations/versions/__init__.py
index f56ccce0a3f..13a83393a91 100644
--- a/providers/edge3/tests/conftest.py
+++ 
b/providers/edge3/src/airflow/providers/edge3/migrations/versions/__init__.py
@@ -14,6 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-pytest_plugins = "tests_common.pytest_plugin"
diff --git a/providers/edge3/src/airflow/providers/edge3/models/db.py 
b/providers/edge3/src/airflow/providers/edge3/models/db.py
new file mode 100644
index 00000000000..f434da64d40
--- /dev/null
+++ b/providers/edge3/src/airflow/providers/edge3/models/db.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pathlib import Path
+
+from sqlalchemy import MetaData, inspect
+
+from airflow.models.base import Base
+from airflow.providers.edge3.models.edge_job import EdgeJobModel
+from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
+from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
+from airflow.utils.db_manager import BaseDBManager
+
+PACKAGE_DIR = Path(__file__).parents[1]
+
+_REVISION_HEADS_MAP: dict[str, str] = {
+    "3.0.0": "9d34dfc2de06",
+}
+
+# Create filtered metadata containing only edge3 tables
+# This avoids validation issues with shared Base.metadata
+_edge_metadata = MetaData()
+EdgeWorkerModel.__table__.to_metadata(_edge_metadata)
+EdgeJobModel.__table__.to_metadata(_edge_metadata)
+EdgeLogsModel.__table__.to_metadata(_edge_metadata)
+
+# Remove edge tables from Airflow's core metadata to prevent validation 
conflicts
+# The tables are now managed exclusively through _edge_metadata
+Base.metadata.remove(EdgeWorkerModel.__table__)
+Base.metadata.remove(EdgeJobModel.__table__)
+Base.metadata.remove(EdgeLogsModel.__table__)
+
+
+class EdgeDBManager(BaseDBManager):
+    """Manages Edge3 provider database tables."""
+
+    # Use filtered metadata instead of shared Base.metadata
+    metadata = _edge_metadata
+
+    version_table_name = "alembic_version_edge3"
+    migration_dir = (PACKAGE_DIR / "migrations").as_posix()
+    alembic_file = (PACKAGE_DIR / "alembic.ini").as_posix()
+    supports_table_dropping = True
+    revision_heads_map = _REVISION_HEADS_MAP
+
+    def drop_tables(self, connection):
+        """
+        Drop only edge3 tables.
+
+        Override base implementation to avoid dropping all tables in shared 
metadata.
+        """
+        if not self.supports_table_dropping:
+            return
+
+        inspector = inspect(connection)
+
+        # Drop edge3 tables in reverse dependency order
+        edge_tables = [
+            EdgeLogsModel.__table__,
+            EdgeJobModel.__table__,
+            EdgeWorkerModel.__table__,
+        ]
+
+        for table in edge_tables:
+            if inspector.has_table(table.name):
+                self.log.info("Dropping table %s", table.name)
+                table.drop(connection)
+
+        # Drop version table
+        version = self._get_migration_ctx()._version
+        if inspector.has_table(version.name):
+            self.log.info("Dropping version table %s", version.name)
+            version.drop(connection)
diff --git a/providers/edge3/tests/conftest.py 
b/providers/edge3/tests/conftest.py
index f56ccce0a3f..27f6db6427e 100644
--- a/providers/edge3/tests/conftest.py
+++ b/providers/edge3/tests/conftest.py
@@ -16,4 +16,22 @@
 # under the License.
 from __future__ import annotations
 
+import pytest
+
 pytest_plugins = "tests_common.pytest_plugin"
+
+
[email protected](autouse=True, scope="session")
+def _create_edge_tables():
+    """Create edge3 tables for tests since they are managed separately from 
Base.metadata."""
+    from airflow import settings
+
+    if not settings.engine:
+        yield
+        return
+
+    from airflow.providers.edge3.models.db import _edge_metadata
+
+    _edge_metadata.create_all(settings.engine)
+    yield
+    _edge_metadata.drop_all(settings.engine)
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py 
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 2d6d4542a8d..b8fa906dbd3 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -461,9 +461,14 @@ class TestEdgeWorker:
     def test_list_edge_workers(self, mock_edgeworker: EdgeWorkerModel):
         args = self.parser.parse_args(["edge", "list-workers", "--output", 
"json"])
         with contextlib.redirect_stdout(StringIO()) as temp_stdout:
-            with patch(
-                
"airflow.providers.edge3.models.edge_worker.get_registered_edge_hosts",
-                return_value=[mock_edgeworker],
+            with (
+                patch(
+                    
"airflow.providers.edge3.cli.edge_command._check_valid_db_connection",
+                ),
+                patch(
+                    
"airflow.providers.edge3.models.edge_worker.get_registered_edge_hosts",
+                    return_value=[mock_edgeworker],
+                ),
             ):
                 edge_command.list_edge_workers(args)
                 out = temp_stdout.getvalue()
diff --git a/providers/edge3/tests/conftest.py 
b/providers/edge3/tests/unit/edge3/migrations/__init__.py
similarity index 90%
copy from providers/edge3/tests/conftest.py
copy to providers/edge3/tests/unit/edge3/migrations/__init__.py
index f56ccce0a3f..13a83393a91 100644
--- a/providers/edge3/tests/conftest.py
+++ b/providers/edge3/tests/unit/edge3/migrations/__init__.py
@@ -14,6 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-pytest_plugins = "tests_common.pytest_plugin"
diff --git a/providers/edge3/tests/conftest.py 
b/providers/edge3/tests/unit/edge3/migrations/test_env.py
similarity index 60%
copy from providers/edge3/tests/conftest.py
copy to providers/edge3/tests/unit/edge3/migrations/test_env.py
index f56ccce0a3f..eb455f5757c 100644
--- a/providers/edge3/tests/conftest.py
+++ b/providers/edge3/tests/unit/edge3/migrations/test_env.py
@@ -16,4 +16,18 @@
 # under the License.
 from __future__ import annotations
 
-pytest_plugins = "tests_common.pytest_plugin"
+from airflow.providers.edge3.models.db import EdgeDBManager
+
+
+class TestMigrationEnv:
+    """Test edge3 migration env configuration."""
+
+    def test_version_table_name(self):
+        assert EdgeDBManager.version_table_name == "alembic_version_edge3"
+
+    def test_target_metadata_contains_edge_tables(self):
+        table_names = set(EdgeDBManager.metadata.tables.keys())
+        assert len(table_names) == 3
+        assert "edge_worker" in table_names
+        assert "edge_job" in table_names
+        assert "edge_logs" in table_names
diff --git a/providers/edge3/tests/unit/edge3/models/test_db.py 
b/providers/edge3/tests/unit/edge3/models/test_db.py
new file mode 100644
index 00000000000..79739f2847c
--- /dev/null
+++ b/providers/edge3/tests/unit/edge3/models/test_db.py
@@ -0,0 +1,241 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.utils.db_manager import RunDBManager
+
+from tests_common.test_utils.config import conf_vars
+
+pytestmark = [pytest.mark.db_test]
+
+
+class TestEdgeDBManager:
+    """Test EdgeDBManager functionality."""
+
+    @conf_vars(
+        {
+            (
+                "database",
+                "external_db_managers",
+            ): "airflow.providers.edge3.models.db.EdgeDBManager"
+        }
+    )
+    def test_db_manager_uses_config(self):
+        """Test that EdgeDBManager is loaded from config."""
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        run_db_manager = RunDBManager()
+        assert run_db_manager._managers == [EdgeDBManager]
+
+    @conf_vars(
+        {
+            (
+                "database",
+                "external_db_managers",
+            ): "airflow.providers.edge3.models.db.EdgeDBManager"
+        }
+    )
+    def test_edge_db_manager_attributes(self):
+        """Test that EdgeDBManager has correct attributes."""
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        assert EdgeDBManager.version_table_name == "alembic_version_edge3"
+        assert EdgeDBManager.supports_table_dropping is True
+        assert "migrations" in EdgeDBManager.migration_dir
+        assert "alembic.ini" in EdgeDBManager.alembic_file
+
+    @conf_vars(
+        {
+            (
+                "database",
+                "external_db_managers",
+            ): "airflow.providers.edge3.models.db.EdgeDBManager"
+        }
+    )
+    @mock.patch("airflow.providers.edge3.models.db.EdgeDBManager")
+    def test_rundbmanager_calls_edge_dbmanager_methods(self, 
mock_edge_db_manager, session):
+        """Test that RunDBManager properly calls EdgeDBManager methods."""
+        mock_edge_db_manager.supports_table_dropping = True
+        edge_db_manager = mock_edge_db_manager.return_value
+        ext_db = RunDBManager()
+
+        # initdb
+        ext_db.initdb(session=session)
+        edge_db_manager.initdb.assert_called_once()
+
+        # upgradedb
+        ext_db.upgradedb(session=session)
+        edge_db_manager.upgradedb.assert_called_once()
+
+        # drop_tables
+        connection = mock.MagicMock()
+        ext_db.drop_tables(session, connection)
+        
mock_edge_db_manager.return_value.drop_tables.assert_called_once_with(connection)
+
+    def test_drop_tables_only_drops_edge_tables(self, session):
+        """Test that drop_tables only drops edge3 tables, not all metadata 
tables."""
+        from airflow.providers.edge3.models.db import EdgeDBManager
+        from airflow.providers.edge3.models.edge_job import EdgeJobModel
+        from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
+        from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
+
+        manager = EdgeDBManager(session)
+
+        # Mock connection and inspector
+        mock_connection = mock.MagicMock()
+        mock_inspector = mock.MagicMock()
+
+        # Setup mock inspector to report all tables exist
+        mock_inspector.has_table.return_value = True
+
+        # Mock inspect to return our mock inspector
+        with mock.patch("airflow.providers.edge3.models.db.inspect", 
return_value=mock_inspector):
+            # Mock the migration context
+            mock_version_table = mock.MagicMock()
+            mock_version_table.name = "alembic_version_edge3"
+            mock_migration_ctx = mock.MagicMock()
+            mock_migration_ctx._version = mock_version_table
+
+            with mock.patch.object(manager, "_get_migration_ctx", 
return_value=mock_migration_ctx):
+                # Mock the drop methods on the actual table objects
+                with (
+                    mock.patch.object(EdgeWorkerModel.__table__, "drop") as 
mock_worker_drop,
+                    mock.patch.object(EdgeJobModel.__table__, "drop") as 
mock_job_drop,
+                    mock.patch.object(EdgeLogsModel.__table__, "drop") as 
mock_logs_drop,
+                ):
+                    # Call drop_tables
+                    manager.drop_tables(mock_connection)
+
+                    # Verify that only edge3 tables were dropped
+                    mock_worker_drop.assert_called_once_with(mock_connection)
+                    mock_job_drop.assert_called_once_with(mock_connection)
+                    mock_logs_drop.assert_called_once_with(mock_connection)
+                    
mock_version_table.drop.assert_called_once_with(mock_connection)
+
+                    # Verify has_table was called for each edge table
+                    expected_calls = [
+                        mock.call("edge_logs"),
+                        mock.call("edge_job"),
+                        mock.call("edge_worker"),
+                        mock.call("alembic_version_edge3"),
+                    ]
+                    mock_inspector.has_table.assert_has_calls(expected_calls, 
any_order=True)
+
+    @mock.patch("airflow.utils.db_manager.command")
+    def test_create_db_from_orm(self, mock_command, session):
+        """Test that create_db_from_orm creates tables and stamps migration."""
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        manager = EdgeDBManager(session)
+
+        with mock.patch.object(manager.metadata, "create_all") as 
mock_create_all:
+            manager.create_db_from_orm()
+
+            mock_create_all.assert_called_once()
+            mock_command.stamp.assert_called_once()
+            # Verify stamp was called with "head"
+            args = mock_command.stamp.call_args
+            assert args[0][1] == "head"
+
+    @mock.patch.object(
+        __import__("airflow.providers.edge3.models.db", 
fromlist=["EdgeDBManager"]).EdgeDBManager,
+        "upgradedb",
+    )
+    @mock.patch.object(
+        __import__("airflow.providers.edge3.models.db", 
fromlist=["EdgeDBManager"]).EdgeDBManager,
+        "create_db_from_orm",
+    )
+    @mock.patch.object(
+        __import__("airflow.providers.edge3.models.db", 
fromlist=["EdgeDBManager"]).EdgeDBManager,
+        "get_current_revision",
+    )
+    def test_initdb_new_db(self, mock_get_rev, mock_create, mock_upgrade, 
session):
+        """Test that initdb calls create_db_from_orm for new databases."""
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        mock_get_rev.return_value = None
+
+        manager = EdgeDBManager(session)
+        manager.initdb()
+
+        mock_create.assert_called_once()
+        mock_upgrade.assert_not_called()
+
+    @mock.patch.object(
+        __import__("airflow.providers.edge3.models.db", 
fromlist=["EdgeDBManager"]).EdgeDBManager,
+        "upgradedb",
+    )
+    @mock.patch.object(
+        __import__("airflow.providers.edge3.models.db", 
fromlist=["EdgeDBManager"]).EdgeDBManager,
+        "create_db_from_orm",
+    )
+    @mock.patch.object(
+        __import__("airflow.providers.edge3.models.db", 
fromlist=["EdgeDBManager"]).EdgeDBManager,
+        "get_current_revision",
+    )
+    def test_initdb_existing_db(self, mock_get_rev, mock_create, mock_upgrade, 
session):
+        """Test that initdb calls upgradedb for existing databases."""
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        mock_get_rev.return_value = "9d34dfc2de06"
+
+        manager = EdgeDBManager(session)
+        manager.initdb()
+
+        mock_upgrade.assert_called_once()
+        mock_create.assert_not_called()
+
+    def test_revision_heads_map_populated(self):
+        """Test that _REVISION_HEADS_MAP is populated with the initial 
migration."""
+        from airflow.providers.edge3.models.db import _REVISION_HEADS_MAP
+
+        assert "3.0.0" in _REVISION_HEADS_MAP
+        assert _REVISION_HEADS_MAP["3.0.0"] == "9d34dfc2de06"
+
+    def test_drop_tables_handles_missing_tables(self, session):
+        """Test that drop_tables handles missing tables gracefully."""
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        manager = EdgeDBManager(session)
+
+        # Mock connection and inspector
+        mock_connection = mock.MagicMock()
+        mock_inspector = mock.MagicMock()
+
+        # Setup mock inspector to report no tables exist
+        mock_inspector.has_table.return_value = False
+
+        # Mock inspect to return our mock inspector
+        with mock.patch("airflow.providers.edge3.models.db.inspect", 
return_value=mock_inspector):
+            # Mock the migration context
+            mock_version_table = mock.MagicMock()
+            mock_version_table.name = "alembic_version_edge3"
+            mock_migration_ctx = mock.MagicMock()
+            mock_migration_ctx._version = mock_version_table
+
+            with mock.patch.object(manager, "_get_migration_ctx", 
return_value=mock_migration_ctx):
+                # Call drop_tables - should not raise an exception
+                manager.drop_tables(mock_connection)
+
+                # Verify that no tables were dropped since none exist
+                # The drop method should not be called on any table
+                # We check this by ensuring has_table was called but drop was 
not
+                assert mock_inspector.has_table.called
diff --git a/scripts/ci/prek/check_revision_heads_map.py 
b/scripts/ci/prek/check_revision_heads_map.py
index 969b66d182b..02a87c7e56b 100755
--- a/scripts/ci/prek/check_revision_heads_map.py
+++ b/scripts/ci/prek/check_revision_heads_map.py
@@ -40,6 +40,10 @@ FAB_PROVIDER_SRC_PATH = AIRFLOW_PROVIDERS_ROOT_PATH / "fab" 
/ "src"
 FAB_DB_FILE = FAB_PROVIDER_SRC_PATH / "airflow" / "providers" / "fab" / 
"auth_manager" / "models" / "db.py"
 FAB_MIGRATION_PATH = FAB_PROVIDER_SRC_PATH / "airflow" / "providers" / "fab" / 
"migrations" / "versions"
 
+EDGE3_PROVIDER_SRC_PATH = AIRFLOW_PROVIDERS_ROOT_PATH / "edge3" / "src"
+EDGE3_DB_FILE = EDGE3_PROVIDER_SRC_PATH / "airflow" / "providers" / "edge3" / 
"models" / "db.py"
+EDGE3_MIGRATION_PATH = EDGE3_PROVIDER_SRC_PATH / "airflow" / "providers" / 
"edge3" / "migrations" / "versions"
+
 
 def revision_heads_map(migration_path):
     rh_map = {}
@@ -49,6 +53,8 @@ def revision_heads_map(migration_path):
         version_pattern = r'airflow_version = "\d+\.\d+\.\d+"'
     elif migration_path == FAB_MIGRATION_PATH:
         version_pattern = r'fab_version = "\d+\.\d+\.\d+"'
+    elif migration_path == EDGE3_MIGRATION_PATH:
+        version_pattern = r'edge3_version = "\d+\.\d+\.\d+"'
     filenames = os.listdir(migration_path)
 
     def sorting_key(filen):
@@ -72,7 +78,11 @@ def revision_heads_map(migration_path):
 
 
 if __name__ == "__main__":
-    paths = [(DB_FILE, MIGRATION_PATH), (FAB_DB_FILE, FAB_MIGRATION_PATH)]
+    paths = [
+        (DB_FILE, MIGRATION_PATH),
+        (FAB_DB_FILE, FAB_MIGRATION_PATH),
+        (EDGE3_DB_FILE, EDGE3_MIGRATION_PATH),
+    ]
     for dbfile, mpath in paths:
         with open(dbfile) as file:
             content = file.read()


Reply via email to