jscheffl commented on code in PR #61155: URL: https://github.com/apache/airflow/pull/61155#discussion_r2807861648
########## providers/edge3/src/airflow/providers/edge3/migrations/versions/0001_3_0_0_create_edge_tables.py: ########## @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Create Edge tables if missing. + +Revision ID: 9d34dfc2de06 +Revises: +Create Date: 2025-06-01 00:00:00.000000 + +Note: This is a placeholder migration used to stamp the migration +when we create the migration from the ORM. Otherwise, it will run +without stamping the migration, leading to subsequent changes to +the tables not being migrated. Review Comment: Text seems to be from boilerplate? Can you adjust? ########## providers/edge3/tests/unit/edge3/migrations/test_env.py: ########## @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.edge3.models.db import EdgeDBManager + + +class TestMigrationEnv: + """Test edge3 migration env configuration.""" + + def test_version_table_name(self): + assert EdgeDBManager.version_table_name == "alembic_version_edge3" + + def test_target_metadata_contains_edge_tables(self): + table_names = set(EdgeDBManager.metadata.tables.keys()) + assert "edge_worker" in table_names + assert "edge_job" in table_names + assert "edge_logs" in table_names Review Comment: To be on safe side, can you add a check that only 3 tables are in meta data? ########## providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py: ########## @@ -61,56 +58,17 @@ def __init__(self, parallelism: int = PARALLELISM): super().__init__(parallelism=parallelism) self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {} - def _check_db_schema(self, engine: Engine) -> None: - """ - Check if already existing table matches the newest table schema. - - workaround as Airflow 2.x had no support for provider DB migrations, - then it is possible to use alembic also for provider distributions. - - TODO(jscheffl): Change to alembic DB migrations in the future. - """ - inspector = inspect(engine) - edge_job_columns = None - edge_job_command_len = None - with contextlib.suppress(NoSuchTableError): - edge_job_schema = inspector.get_columns("edge_job") - edge_job_columns = [column["name"] for column in edge_job_schema] - for column in edge_job_schema: - if column["name"] == "command": - edge_job_command_len = column["type"].length # type: ignore[attr-defined] - - # version 0.6.0rc1 added new column concurrency_slots - if edge_job_columns and "concurrency_slots" not in edge_job_columns: - EdgeJobModel.metadata.drop_all(engine, tables=[EdgeJobModel.__table__]) - - # version 1.1.0 the command column was changed to VARCHAR(2048) - elif edge_job_command_len and edge_job_command_len != 2048: - with Session(engine) as session: - query = "ALTER TABLE edge_job ALTER COLUMN command TYPE VARCHAR(2048);" - session.execute(text(query)) - session.commit() - - edge_worker_columns = None - with contextlib.suppress(NoSuchTableError): - edge_worker_columns = [column["name"] for column in inspector.get_columns("edge_worker")] - - # version 0.14.0pre0 added new column maintenance_comment - if edge_worker_columns and "maintenance_comment" not in edge_worker_columns: - with Session(engine) as session: - query = "ALTER TABLE edge_worker ADD maintenance_comment VARCHAR(1024);" - session.execute(text(query)) - session.commit() - @provide_session def start(self, session: Session = NEW_SESSION): """If EdgeExecutor provider is loaded first time, ensure table exists.""" + from airflow.providers.edge3.models.db import EdgeDBManager Review Comment: Is there a reason for lazy import? If not can you move it to top of file? ########## providers/edge3/src/airflow/providers/edge3/models/db.py: ########## @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pathlib import Path + +from sqlalchemy import MetaData, inspect + +from airflow.models.base import Base +from airflow.providers.edge3.models.edge_job import EdgeJobModel +from airflow.providers.edge3.models.edge_logs import EdgeLogsModel +from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel +from airflow.utils.db_manager import BaseDBManager + +PACKAGE_DIR = Path(__file__).parents[1] + +_REVISION_HEADS_MAP: dict[str, str] = { + "3.0.0": "9d34dfc2de06", +} + +# Create filtered metadata containing only edge3 tables +# This avoids validation issues with shared Base.metadata +_edge_metadata = MetaData() +EdgeWorkerModel.__table__.to_metadata(_edge_metadata) +EdgeJobModel.__table__.to_metadata(_edge_metadata) +EdgeLogsModel.__table__.to_metadata(_edge_metadata) + +# Remove edge tables from Airflow's core metadata to prevent validation conflicts +# The tables are now managed exclusively through _edge_metadata +Base.metadata.remove(EdgeWorkerModel.__table__) +Base.metadata.remove(EdgeJobModel.__table__) +Base.metadata.remove(EdgeLogsModel.__table__) + + +class EdgeDBManager(BaseDBManager): + """Manages Edge3 provider database tables.""" + + # Use filtered metadata instead of shared Base.metadata + metadata = _edge_metadata + + version_table_name = "alembic_version_edge3" + migration_dir = (PACKAGE_DIR / "migrations").as_posix() + alembic_file = (PACKAGE_DIR / "alembic.ini").as_posix() + supports_table_dropping = True + revision_heads_map = _REVISION_HEADS_MAP + + def drop_tables(self, connection): + """ + Drop only edge3 tables. + + Override base implementation to avoid dropping all tables in shared metadata. + """ + if not self.supports_table_dropping: + return + + inspector = inspect(connection) + + # Drop edge3 tables in reverse dependency order + edge_tables = [ + EdgeLogsModel.__table__, + EdgeJobModel.__table__, + EdgeWorkerModel.__table__, Review Comment: Sure the property does not need to be named `__tablename__`? ```suggestion EdgeLogsModel.__tablename__, EdgeJobModel.__tablename__, EdgeWorkerModel.__tablename__, ``` ########## providers/edge3/src/airflow/providers/edge3/models/__init__.py: ########## @@ -14,3 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +from airflow.providers.edge3.models.db import EdgeDBManager + +__all__ = ["EdgeDBManager"] Review Comment: Edge DB Manager is not `__all__`? There are also the other model classes in the same package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
