Copilot commented on code in PR #64905:
URL: https://github.com/apache/airflow/pull/64905#discussion_r3056860475


##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (the 2.6.2 snapshot, looked up via
+``_REVISION_HEADS_MAP["2.6.2"]``) and iterates through each subsequent revision
+in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _REVISION_HEADS_MAP, _get_alembic_config, 
upgradedb
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = _REVISION_HEADS_MAP["2.6.2"]
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1:]
+
+
[email protected](scope="module")
+def stairway_db():
+    """
+    Bring the DB to the squashed baseline before the stairway test runs, and
+    restore it to the current heads afterwards so other tests are unaffected.
+    """
+    config = _get_alembic_config()
+
+    # Downgrade to the squashed base so the stairway starts from a known state.
+    command.downgrade(config, revision=_SQUASHED_BASE_REVISION)
+

Review Comment:
   The test runs migrations via raw `alembic.command.upgrade/downgrade`, 
bypassing Airflow’s `upgradedb()`/`downgrade()` wrappers that add important 
safeguards (global migration lock, MySQL metadata-lock handling, and 
single-connection pool). It also skips the FAB downgrade handling that 
`airflow.utils.db.downgrade()` performs for downgrades to < 2.10.3, which this 
test does when going back to the 2.6.2 squashed baseline. Prefer using 
`airflow.utils.db.upgradedb(to_revision=...)` and 
`airflow.utils.db.downgrade(to_revision=...)` (or reusing the same locking/pool 
context managers) for each step so the test is reliable across supported DB 
backends and installed providers.



##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (the 2.6.2 snapshot, looked up via
+``_REVISION_HEADS_MAP["2.6.2"]``) and iterates through each subsequent revision
+in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _REVISION_HEADS_MAP, _get_alembic_config, 
upgradedb
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = _REVISION_HEADS_MAP["2.6.2"]
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1:]

Review Comment:
   `_get_revisions_in_order()` falls back to returning all revision IDs when 
the squashed base revision is not present, but the fixture always tries to 
downgrade to `_SQUASHED_BASE_REVISION` first. If that revision is genuinely 
missing, the test will fail earlier during `command.downgrade(...)` anyway. 
Consider making this an explicit failure (assert) or `pytest.skip` with a clear 
message, rather than returning a list that can’t be used with the chosen 
starting point.



##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (the 2.6.2 snapshot, looked up via
+``_REVISION_HEADS_MAP["2.6.2"]``) and iterates through each subsequent revision
+in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _REVISION_HEADS_MAP, _get_alembic_config, 
upgradedb
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = _REVISION_HEADS_MAP["2.6.2"]
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1:]
+
+
[email protected](scope="module")
+def stairway_db():
+    """
+    Bring the DB to the squashed baseline before the stairway test runs, and
+    restore it to the current heads afterwards so other tests are unaffected.
+    """
+    config = _get_alembic_config()
+
+    # Downgrade to the squashed base so the stairway starts from a known state.
+    command.downgrade(config, revision=_SQUASHED_BASE_REVISION)
+
+    yield
+
+    # Always restore to the latest heads, even if the test failed mid-way.
+    upgradedb()
+
+
+def test_migration_stairway(stairway_db) -> None:
+    """
+    Walk every incremental migration step: upgrade → downgrade → re-upgrade.
+
+    Runs as a single test (not parametrized) so execution order and DB state
+    are guaranteed without relying on xdist grouping or cross-test ordering.
+    Each step uses Alembic's relative ``-1`` target for downgrade so that merge

Review Comment:
   The PR description says this is a parametrized test with one test ID per 
revision hash, but this implementation is intentionally a single 
non-parametrized test. Either update the PR description to match the 
implementation, or change the test to be parametrized (while still ensuring DB 
state/order safety) so failures are attributed to a specific revision in the 
test report.



##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (the 2.6.2 snapshot, looked up via
+``_REVISION_HEADS_MAP["2.6.2"]``) and iterates through each subsequent revision
+in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _REVISION_HEADS_MAP, _get_alembic_config, 
upgradedb
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = _REVISION_HEADS_MAP["2.6.2"]
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1:]
+
+
[email protected](scope="module")
+def stairway_db():
+    """
+    Bring the DB to the squashed baseline before the stairway test runs, and
+    restore it to the current heads afterwards so other tests are unaffected.
+    """
+    config = _get_alembic_config()
+
+    # Downgrade to the squashed base so the stairway starts from a known state.
+    command.downgrade(config, revision=_SQUASHED_BASE_REVISION)
+
+    yield
+
+    # Always restore to the latest heads, even if the test failed mid-way.
+    upgradedb()
+
+
+def test_migration_stairway(stairway_db) -> None:
+    """
+    Walk every incremental migration step: upgrade → downgrade → re-upgrade.
+
+    Runs as a single test (not parametrized) so execution order and DB state
+    are guaranteed without relying on xdist grouping or cross-test ordering.
+    Each step uses Alembic's relative ``-1`` target for downgrade so that merge
+    revisions are handled correctly — it always rolls back exactly one step 
from
+    the current head regardless of how many parents a merge revision has.
+    """
+    config = _get_alembic_config()
+    revisions = _get_revisions_in_order()
+
+    for revision_id in revisions:
+        # Step 1: upgrade to this revision
+        command.upgrade(config, revision=revision_id)
+
+        # Step 2: downgrade exactly one step back.
+        # Using the relative specifier "-1" from the current head is correct 
for
+        # both normal and merge revisions — it never picks an arbitrary parent.
+        command.downgrade(config, revision="-1")

Review Comment:
   Same issue in the main loop: calling Alembic commands directly skips 
Airflow’s migration wrappers (locking, MySQL handling, etc.). Using 
`upgradedb(to_revision=revision_id)` / `downgrade(to_revision='-1')` (or 
equivalent shared context managers around Alembic calls) will avoid 
backend-specific hangs and provider-related downgrade failures.



##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (the 2.6.2 snapshot, looked up via
+``_REVISION_HEADS_MAP["2.6.2"]``) and iterates through each subsequent revision
+in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _REVISION_HEADS_MAP, _get_alembic_config, 
upgradedb
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = _REVISION_HEADS_MAP["2.6.2"]
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1:]
+
+
[email protected](scope="module")
+def stairway_db():
+    """
+    Bring the DB to the squashed baseline before the stairway test runs, and
+    restore it to the current heads afterwards so other tests are unaffected.
+    """
+    config = _get_alembic_config()
+
+    # Downgrade to the squashed base so the stairway starts from a known state.
+    command.downgrade(config, revision=_SQUASHED_BASE_REVISION)
+
+    yield
+
+    # Always restore to the latest heads, even if the test failed mid-way.
+    upgradedb()
+
+
+def test_migration_stairway(stairway_db) -> None:
+    """
+    Walk every incremental migration step: upgrade → downgrade → re-upgrade.
+
+    Runs as a single test (not parametrized) so execution order and DB state
+    are guaranteed without relying on xdist grouping or cross-test ordering.
+    Each step uses Alembic's relative ``-1`` target for downgrade so that merge
+    revisions are handled correctly — it always rolls back exactly one step 
from
+    the current head regardless of how many parents a merge revision has.
+    """
+    config = _get_alembic_config()
+    revisions = _get_revisions_in_order()
+
+    for revision_id in revisions:
+        # Step 1: upgrade to this revision
+        command.upgrade(config, revision=revision_id)
+

Review Comment:
   If a migration fails, the exception will surface without explicitly stating 
which `revision_id` was being exercised, which can make failures harder to 
triage in CI logs. Consider wrapping the per-revision steps to 
re-raise/`pytest.fail` with the current `revision_id` included in the message 
so the broken migration is immediately obvious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to