This is an automated email from the ASF dual-hosted git repository.
turaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a7cbd42137f Replace Base.metadata.remove() hack with dedicated Edge3
declarative base (#62202)
a7cbd42137f is described below
commit a7cbd42137f5f0a061dcf868cc143272f2985f95
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Sat Feb 21 15:57:22 2026 -0600
Replace Base.metadata.remove() hack with dedicated Edge3 declarative base
(#62202)
* Replace Base.metadata.remove() hack with dedicated Edge3 declarative base
Introduce models/edge_base.py with its own MetaData, registry, and
Base for the Edge3 provider. All three Edge models now inherit from this
isolated base, so their tables are never registered in Airflow core's
Base.metadata in the first place — eliminating the need for the
to_metadata() copy + Base.metadata.remove() workaround in db.py.
* Fix tests
* Add unit test for edge base
---
.../edge3/src/airflow/providers/edge3/models/db.py | 28 ++---------
.../airflow/providers/edge3/models/edge_base.py} | 28 +++++------
.../src/airflow/providers/edge3/models/edge_job.py | 3 +-
.../airflow/providers/edge3/models/edge_logs.py | 3 +-
.../airflow/providers/edge3/models/edge_worker.py | 2 +-
providers/edge3/tests/conftest.py | 6 +--
.../tests/unit/edge3/models/test_edge_base.py | 55 ++++++++++++++++++++++
7 files changed, 78 insertions(+), 47 deletions(-)
diff --git a/providers/edge3/src/airflow/providers/edge3/models/db.py
b/providers/edge3/src/airflow/providers/edge3/models/db.py
index f434da64d40..ac61ed987c3 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/db.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/db.py
@@ -18,9 +18,9 @@ from __future__ import annotations
from pathlib import Path
-from sqlalchemy import MetaData, inspect
+from sqlalchemy import inspect
-from airflow.models.base import Base
+from airflow.providers.edge3.models.edge_base import edge_metadata
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
@@ -32,25 +32,11 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"3.0.0": "9d34dfc2de06",
}
-# Create filtered metadata containing only edge3 tables
-# This avoids validation issues with shared Base.metadata
-_edge_metadata = MetaData()
-EdgeWorkerModel.__table__.to_metadata(_edge_metadata)
-EdgeJobModel.__table__.to_metadata(_edge_metadata)
-EdgeLogsModel.__table__.to_metadata(_edge_metadata)
-
-# Remove edge tables from Airflow's core metadata to prevent validation
conflicts
-# The tables are now managed exclusively through _edge_metadata
-Base.metadata.remove(EdgeWorkerModel.__table__)
-Base.metadata.remove(EdgeJobModel.__table__)
-Base.metadata.remove(EdgeLogsModel.__table__)
-
class EdgeDBManager(BaseDBManager):
"""Manages Edge3 provider database tables."""
- # Use filtered metadata instead of shared Base.metadata
- metadata = _edge_metadata
+ metadata = edge_metadata
version_table_name = "alembic_version_edge3"
migration_dir = (PACKAGE_DIR / "migrations").as_posix()
@@ -59,17 +45,12 @@ class EdgeDBManager(BaseDBManager):
revision_heads_map = _REVISION_HEADS_MAP
def drop_tables(self, connection):
- """
- Drop only edge3 tables.
-
- Override base implementation to avoid dropping all tables in shared
metadata.
- """
+ """Drop only edge3 tables in reverse dependency order."""
if not self.supports_table_dropping:
return
inspector = inspect(connection)
- # Drop edge3 tables in reverse dependency order
edge_tables = [
EdgeLogsModel.__table__,
EdgeJobModel.__table__,
@@ -81,7 +62,6 @@ class EdgeDBManager(BaseDBManager):
self.log.info("Dropping table %s", table.name)
table.drop(connection)
- # Drop version table
version = self._get_migration_ctx()._version
if inspector.has_table(version.name):
self.log.info("Dropping version table %s", version.name)
diff --git a/providers/edge3/tests/conftest.py
b/providers/edge3/src/airflow/providers/edge3/models/edge_base.py
similarity index 55%
copy from providers/edge3/tests/conftest.py
copy to providers/edge3/src/airflow/providers/edge3/models/edge_base.py
index 27f6db6427e..2d97c997599 100644
--- a/providers/edge3/tests/conftest.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_base.py
@@ -16,22 +16,16 @@
# under the License.
from __future__ import annotations
-import pytest
+from sqlalchemy import MetaData
+from sqlalchemy.orm import registry
-pytest_plugins = "tests_common.pytest_plugin"
+from airflow.models.base import _get_schema, naming_convention
-
[email protected](autouse=True, scope="session")
-def _create_edge_tables():
- """Create edge3 tables for tests since they are managed separately from
Base.metadata."""
- from airflow import settings
-
- if not settings.engine:
- yield
- return
-
- from airflow.providers.edge3.models.db import _edge_metadata
-
- _edge_metadata.create_all(settings.engine)
- yield
- _edge_metadata.drop_all(settings.engine)
+# Isolated metadata for Edge3 provider tables.
+# By using a dedicated MetaData + registry + Base, Edge3 tables are never
+# registered in Airflow core's Base.metadata, avoiding validation conflicts
+# without needing the post-hoc Base.metadata.remove() hack.
+edge_metadata = MetaData(schema=_get_schema(),
naming_convention=naming_convention)
+_edge_mapper_registry = registry(metadata=edge_metadata)
+Base = _edge_mapper_registry.generate_base()
+Base.__allow_unmapped__ = True # match core Base workaround for unmapped v1.4
models
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_job.py
b/providers/edge3/src/airflow/providers/edge3/models/edge_job.py
index d3227903465..d98f3076a32 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_job.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_job.py
@@ -26,9 +26,10 @@ from sqlalchemy import (
)
from sqlalchemy.orm import Mapped
-from airflow.models.base import Base, StringID
+from airflow.models.base import StringID
from airflow.providers.common.compat.sdk import TaskInstanceKey, timezone
from airflow.providers.common.compat.sqlalchemy.orm import mapped_column
+from airflow.providers.edge3.models.edge_base import Base
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import UtcDateTime
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py
b/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py
index f4366756841..df9e7cd8db3 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py
@@ -26,8 +26,9 @@ from sqlalchemy import (
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import Mapped
-from airflow.models.base import Base, StringID
+from airflow.models.base import StringID
from airflow.providers.common.compat.sqlalchemy.orm import mapped_column
+from airflow.providers.edge3.models.edge_base import Base
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import UtcDateTime
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
index 2e739826793..e4b0698e7e0 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
@@ -26,8 +26,8 @@ from typing import TYPE_CHECKING
from sqlalchemy import Integer, String, delete, select
from sqlalchemy.orm import Mapped
-from airflow.models.base import Base
from airflow.providers.common.compat.sdk import AirflowException, Stats,
timezone
+from airflow.providers.edge3.models.edge_base import Base
try:
from airflow.sdk.observability.stats import DualStatsManager
diff --git a/providers/edge3/tests/conftest.py
b/providers/edge3/tests/conftest.py
index 27f6db6427e..b5de80cd45a 100644
--- a/providers/edge3/tests/conftest.py
+++ b/providers/edge3/tests/conftest.py
@@ -30,8 +30,8 @@ def _create_edge_tables():
yield
return
- from airflow.providers.edge3.models.db import _edge_metadata
+ from airflow.providers.edge3.models.edge_base import edge_metadata
- _edge_metadata.create_all(settings.engine)
+ edge_metadata.create_all(settings.engine)
yield
- _edge_metadata.drop_all(settings.engine)
+ edge_metadata.drop_all(settings.engine)
diff --git a/providers/edge3/tests/unit/edge3/models/test_edge_base.py
b/providers/edge3/tests/unit/edge3/models/test_edge_base.py
new file mode 100644
index 00000000000..148bb71e92e
--- /dev/null
+++ b/providers/edge3/tests/unit/edge3/models/test_edge_base.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.models.base import naming_convention
+from airflow.providers.edge3.models.edge_base import Base, edge_metadata
+
+
+class TestEdgeBase:
+ def test_edge_metadata_is_isolated_from_core(self):
+ """edge_metadata must not be the same object as Airflow core's
Base.metadata."""
+ from airflow.models.base import Base as CoreBase
+
+ assert edge_metadata is not CoreBase.metadata
+
+ def test_edge_tables_not_in_core_metadata(self):
+ """Edge3 tables must never appear in Airflow core's Base.metadata."""
+ from airflow.models.base import Base as CoreBase
+
+ edge_table_names = {"edge_worker", "edge_job", "edge_logs"}
+ core_table_names = set(CoreBase.metadata.tables.keys())
+ assert not edge_table_names & core_table_names
+
+ def test_edge_metadata_contains_edge_tables(self):
+ """edge_metadata must contain all three Edge3 tables."""
+ # Import models to ensure they are registered
+ import airflow.providers.edge3.models.edge_job
+ import airflow.providers.edge3.models.edge_logs
+ import airflow.providers.edge3.models.edge_worker # noqa: F401
+
+ assert "edge_worker" in edge_metadata.tables
+ assert "edge_job" in edge_metadata.tables
+ assert "edge_logs" in edge_metadata.tables
+
+ def test_edge_metadata_uses_same_naming_convention_as_core(self):
+ """edge_metadata should use the same naming convention as Airflow
core."""
+ assert edge_metadata.naming_convention == naming_convention
+
+ def test_base_allow_unmapped(self):
+ """Base must have __allow_unmapped__ set to match core Base
workaround."""
+ assert Base.__allow_unmapped__ is True