jscheffl commented on code in PR #68074:
URL: https://github.com/apache/airflow/pull/68074#discussion_r3365334043
##########
chart/templates/jobs/migrate-database-job.yaml:
##########
@@ -105,6 +105,16 @@ spec:
{{- end }}
{{- if .Values.migrateDatabaseJob.args }}
args: {{- tpl (toYaml .Values.migrateDatabaseJob.args) . | nindent
12 }}
+ {{- else }}
+ # Bidirectional metadata DB reconciliation: forward migrate, no-op,
or
+ # exec downgrade in the still-running api-server pod depending on the
+ # relationship between the chart's target version and the DB's
current
+ # alembic head. See https://github.com/apache/airflow/issues/68072.
Review Comment:
Same here: Would not point to the issue ticket.
##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current -> no-op (idempotent check)
+* target > current -> ``airflow db migrate`` inside this job's container
+ (uses the TARGET image, which ships forward scripts).
+* target < current -> ``airflow db downgrade --to-version <target>``
+ executed inside the still-running api-server pod (the OLD image still
+ ships the reverse scripts), followed by scaling every DB-touching
+ workload (api-server, scheduler, triggerer, dag-processor, worker) to
+ zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+ then patches those workloads back to ``replicas: N`` with the TARGET
+ image as the upgrade proceeds, so the cluster comes back up cleanly on
+ the target version. This means a downgrade trades the otherwise-broken
+ rolling-update window for a brief outage (which is unavoidable when the
+ schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+ only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
Review Comment:
Would be great to open a PR that exposes the needed information on a public
interface in parallel such that this can land in Airflow 3.3. Then this script
could attempt first to take the public method and fall-back on private member
until (somewhen in future) support for Airflow 3.3 is cleaned.
##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current -> no-op (idempotent check)
+* target > current -> ``airflow db migrate`` inside this job's container
+ (uses the TARGET image, which ships forward scripts).
+* target < current -> ``airflow db downgrade --to-version <target>``
+ executed inside the still-running api-server pod (the OLD image still
+ ships the reverse scripts), followed by scaling every DB-touching
+ workload (api-server, scheduler, triggerer, dag-processor, worker) to
+ zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+ then patches those workloads back to ``replicas: N`` with the TARGET
+ image as the upgrade proceeds, so the cluster comes back up cleanly on
+ the target version. This means a downgrade trades the otherwise-broken
+ rolling-update window for a brief outage (which is unavoidable when the
+ schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+ only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+ """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+ target_rev = _REVISION_HEADS_MAP.get(target)
+ if target_rev is None:
+ # Unknown target version (e.g. dev build). Be conservative: forward
only.
+ return "forward"
+
+ try:
+ with engine.connect() as conn:
+ current_rev =
MigrationContext.configure(conn).get_current_revision()
+ except OperationalError:
+ # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+ # re-surface a real connectivity error with a clearer message.
+ return "fresh"
+
+ if current_rev is None:
+ return "fresh"
+ if current_rev == target_rev:
+ return "noop"
+
+ # Walk the TARGET image's revision graph from base to target_rev. Target's
+ # ScriptDirectory only contains revisions up to target's heads, so we
+ # cannot walk from an unknown current_rev (the downgrade case): the call
+ # would raise ``RevisionError``. Instead, build the ancestor set of target
+ # and check membership.
+ cfg = Config()
+ cfg.set_main_option(
+ "script_location",
+ os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+ )
+ script = ScriptDirectory.from_config(cfg)
+ ancestors_of_target = {rev.revision for rev in
script.walk_revisions("base", target_rev)}
+ if current_rev in ancestors_of_target:
+ # current is an ancestor of target -> moving forward.
+ return "forward"
+ # current is not in target's history -> must be newer than target.
+ return "downgrade"
+
+
+def discover_api_server_pod(namespace: str) -> str:
+ """Return the name of a Running api-server pod in *namespace*."""
+ k8s_config.load_incluster_config()
+ api = client.CoreV1Api()
+ pods = api.list_namespaced_pod(
+ namespace=namespace,
+ label_selector="component=api-server",
+ field_selector="status.phase=Running",
+ ).items
+ if not pods:
+ raise RuntimeError(f"no Running api-server pod found in namespace
{namespace}")
+
+ # Prefer Ready pods so we don't pick one mid-rollout.
+ ready = [
+ p for p in pods if any(c.type == "Ready" and c.status == "True" for c
in (p.status.conditions or []))
+ ]
+ return (ready or pods)[0].metadata.name
Review Comment:
Handling of case that no pod is ready is missing, gets an
array-out-of-bounds.
##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current -> no-op (idempotent check)
+* target > current -> ``airflow db migrate`` inside this job's container
+ (uses the TARGET image, which ships forward scripts).
+* target < current -> ``airflow db downgrade --to-version <target>``
+ executed inside the still-running api-server pod (the OLD image still
+ ships the reverse scripts), followed by scaling every DB-touching
+ workload (api-server, scheduler, triggerer, dag-processor, worker) to
+ zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+ then patches those workloads back to ``replicas: N`` with the TARGET
+ image as the upgrade proceeds, so the cluster comes back up cleanly on
+ the target version. This means a downgrade trades the otherwise-broken
+ rolling-update window for a brief outage (which is unavoidable when the
+ schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+ only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+ """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+ target_rev = _REVISION_HEADS_MAP.get(target)
+ if target_rev is None:
+ # Unknown target version (e.g. dev build). Be conservative: forward
only.
+ return "forward"
+
+ try:
+ with engine.connect() as conn:
+ current_rev =
MigrationContext.configure(conn).get_current_revision()
+ except OperationalError:
+ # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+ # re-surface a real connectivity error with a clearer message.
+ return "fresh"
+
+ if current_rev is None:
+ return "fresh"
+ if current_rev == target_rev:
+ return "noop"
+
+ # Walk the TARGET image's revision graph from base to target_rev. Target's
+ # ScriptDirectory only contains revisions up to target's heads, so we
+ # cannot walk from an unknown current_rev (the downgrade case): the call
+ # would raise ``RevisionError``. Instead, build the ancestor set of target
+ # and check membership.
+ cfg = Config()
+ cfg.set_main_option(
+ "script_location",
+ os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+ )
+ script = ScriptDirectory.from_config(cfg)
+ ancestors_of_target = {rev.revision for rev in
script.walk_revisions("base", target_rev)}
+ if current_rev in ancestors_of_target:
Review Comment:
Why are you not looking into `_REVISION_HEADS_MAP` like above? Assuming that
only "real" versions are listed I _think_ this is sufficient, if somebody has a
version in between or build manually from main, I think would be OK not to
support all side-cases. Would reduce dependability in internal Airflow details.
##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current -> no-op (idempotent check)
+* target > current -> ``airflow db migrate`` inside this job's container
+ (uses the TARGET image, which ships forward scripts).
+* target < current -> ``airflow db downgrade --to-version <target>``
+ executed inside the still-running api-server pod (the OLD image still
+ ships the reverse scripts), followed by scaling every DB-touching
+ workload (api-server, scheduler, triggerer, dag-processor, worker) to
+ zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+ then patches those workloads back to ``replicas: N`` with the TARGET
+ image as the upgrade proceeds, so the cluster comes back up cleanly on
+ the target version. This means a downgrade trades the otherwise-broken
+ rolling-update window for a brief outage (which is unavoidable when the
+ schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+ only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
Review Comment:
I think reference to a past issue is not really needed. Context from above
is sufficient.
##########
chart/templates/rbac/migrate-database-job-role.yaml:
##########
@@ -0,0 +1,78 @@
+{{/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/}}
+
+#######################################
+## Airflow Migrate Database Job Role
+##
+## Grants pods + pods/exec in the release namespace so the migrate-database-job
+## can exec ``airflow db downgrade`` inside the still-running api-server pod on
+## a chart downgrade. The forward-migrate branch never uses these permissions.
+##
+## Tracked in https://github.com/apache/airflow/issues/68072.
Review Comment:
Also here propose to remove ticket number
##########
chart/tests/helm_tests/airflow_aux/test_db_migrate_script.py:
##########
@@ -0,0 +1,305 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unit tests for the embedded bidirectional reconciler
``chart/files/db_migrate.py``.
+
+These exercise the runtime behaviour of the script itself (the helm template
+tests only check that the script is embedded with the right env/args/RBAC).
+The decide_action regression test exists because the first iteration walked
+the target image's ScriptDirectory from base to ``current_rev``, which raises
+``RevisionError`` in the actual downgrade case where ``current_rev`` is newer
+than anything the target image knows about. See
+https://github.com/apache/airflow/issues/68072.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import pathlib
+import types
+from unittest import mock
+
+import pytest
+from sqlalchemy.exc import OperationalError
+
+DB_MIGRATE_PATH = pathlib.Path(__file__).resolve().parents[3] / "files" /
"db_migrate.py"
+
+
[email protected](scope="module")
+def db_migrate():
+ """Load chart/files/db_migrate.py as a module.
+
+ The file is normally fed to ``python3 -c`` by the helm job rather than
+ imported, so it lives outside the chart's Python package tree.
+ """
+ spec = importlib.util.spec_from_file_location("chart_db_migrate",
DB_MIGRATE_PATH)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+# --------------------------------------------------------------------------
+# decide_action
+# --------------------------------------------------------------------------
+
+
[email protected]
+def patched_revision_map(db_migrate, monkeypatch):
+ """Install a stable {version: revision} mapping for decide_action tests."""
+ fake_map = {
+ "3.0.0": "rev_300",
+ "3.1.0": "rev_310",
+ "3.2.0": "rev_320",
+ }
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", fake_map)
+ return fake_map
+
+
+def _patch_engine_returning(db_migrate, monkeypatch, current_rev):
+ """Patch ``engine.connect()`` so MigrationContext.get_current_revision()
returns *current_rev*."""
+
+ class _Ctx:
+ def get_current_revision(self):
+ return current_rev
+
+ class _Conn:
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_):
+ return False
+
+ monkeypatch.setattr(db_migrate.engine, "connect", lambda: _Conn())
+ monkeypatch.setattr(
+ db_migrate.MigrationContext,
+ "configure",
+ staticmethod(lambda _conn: _Ctx()),
+ )
+
+
+def _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head):
+ """Patch ScriptDirectory.from_config so walk_revisions("base", X) yields
ancestors_by_head[X]."""
+
+ class _Rev:
+ def __init__(self, revision):
+ self.revision = revision
+
+ class _ScriptDir:
+ def walk_revisions(self, base, head):
+ assert base == "base"
+ return [_Rev(r) for r in ancestors_by_head[head]]
+
+ monkeypatch.setattr(db_migrate.ScriptDirectory, "from_config",
staticmethod(lambda _cfg: _ScriptDir()))
+
+
+def test_decide_action_unknown_target_falls_back_to_forward(db_migrate,
monkeypatch):
+ # Empty map: any target is unknown -> conservative forward migrate.
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", {})
+ assert db_migrate.decide_action("9.9.9") == "forward"
+
+
+def test_decide_action_fresh_when_db_unreachable(db_migrate, monkeypatch,
patched_revision_map):
+ def _raise(*_a, **_kw):
+ raise OperationalError("SELECT 1", {}, Exception("unreachable"))
+
+ monkeypatch.setattr(db_migrate.engine, "connect", _raise)
+ assert db_migrate.decide_action("3.1.0") == "fresh"
+
+
+def test_decide_action_fresh_when_no_alembic_row(db_migrate, monkeypatch,
patched_revision_map):
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev=None)
+ assert db_migrate.decide_action("3.1.0") == "fresh"
+
+
+def test_decide_action_noop_when_current_equals_target(db_migrate,
monkeypatch, patched_revision_map):
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_310")
+ assert db_migrate.decide_action("3.1.0") == "noop"
+
+
+def test_decide_action_forward_when_current_is_ancestor_of_target(
+ db_migrate, monkeypatch, patched_revision_map
+):
+ # current=3.0.0, target=3.1.0 -> rev_300 is in target's ancestor set.
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_300")
+ _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head={"rev_310":
["rev_300", "rev_310"]})
+ assert db_migrate.decide_action("3.1.0") == "forward"
+
+
+def test_decide_action_downgrade_when_current_not_in_target_ancestors(
+ db_migrate, monkeypatch, patched_revision_map
+):
+ """Regression test for the original blocker.
+
+ On a real downgrade, the TARGET image's ScriptDirectory does NOT contain
+ ``current_rev`` (current is newer than target). The previous implementation
+ called ``walk_revisions("base", current_rev)`` which raised
+ ``RevisionError`` and never reached the downgrade branch. The fix walks to
+ ``target_rev`` (always present in target's scripts) and checks membership.
+ """
+ # current=3.2.0 (newer), target=3.1.0 (older). rev_320 is unknown to
target.
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_320")
+ _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head={"rev_310":
["rev_300", "rev_310"]})
+ assert db_migrate.decide_action("3.1.0") == "downgrade"
+
+
+# --------------------------------------------------------------------------
+# discover_api_server_pod
+# --------------------------------------------------------------------------
+
+
+def _pod(name, ready=True):
+ pod = types.SimpleNamespace()
+ pod.metadata = types.SimpleNamespace(name=name)
+ pod.status = types.SimpleNamespace(
+ conditions=[types.SimpleNamespace(type="Ready", status="True" if ready
else "False")]
+ )
+ return pod
+
+
+def test_discover_api_server_pod_prefers_ready(db_migrate, monkeypatch):
+ fake_api = mock.MagicMock()
+ fake_api.list_namespaced_pod.return_value.items = [
+ _pod("api-server-old", ready=False),
+ _pod("api-server-new", ready=True),
+ ]
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda: fake_api)
+ assert db_migrate.discover_api_server_pod("airflow") == "api-server-new"
+ fake_api.list_namespaced_pod.assert_called_once_with(
+ namespace="airflow",
+ label_selector="component=api-server",
+ field_selector="status.phase=Running",
+ )
+
+
+def test_discover_api_server_pod_falls_back_to_non_ready(db_migrate,
monkeypatch):
+ fake_api = mock.MagicMock()
+ fake_api.list_namespaced_pod.return_value.items = [_pod("api-server-old",
ready=False)]
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda: fake_api)
+ assert db_migrate.discover_api_server_pod("airflow") == "api-server-old"
+
+
+def test_discover_api_server_pod_raises_when_none(db_migrate, monkeypatch):
+ fake_api = mock.MagicMock()
+ fake_api.list_namespaced_pod.return_value.items = []
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda: fake_api)
+ with pytest.raises(RuntimeError, match="no Running api-server pod"):
+ db_migrate.discover_api_server_pod("airflow")
+
+
+# --------------------------------------------------------------------------
+# run_downgrade_in_api_server
+# --------------------------------------------------------------------------
+
+
+def _fake_stream(returncode):
Review Comment:
Why not making this a fixture?
##########
chart/values.yaml:
##########
@@ -2121,13 +2121,11 @@ migrateDatabaseJob:
command: ~
# Args to use when running the migrate database job (templated).
- args:
- - "bash"
- - "-c"
- - >-
- exec \
-
- airflow db migrate
+ # When unset, the chart runs a bidirectional reconciliation script
+ # (forward migrate, no-op, or exec downgrade against the running api-server)
+ # depending on whether the chart is being upgraded or downgraded.
+ # See https://github.com/apache/airflow/issues/68072.
Review Comment:
Also here... ticket ref not needed.
##########
chart/tests/helm_tests/airflow_aux/test_db_migrate_script.py:
##########
@@ -0,0 +1,305 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unit tests for the embedded bidirectional reconciler
``chart/files/db_migrate.py``.
+
+These exercise the runtime behaviour of the script itself (the helm template
+tests only check that the script is embedded with the right env/args/RBAC).
+The decide_action regression test exists because the first iteration walked
+the target image's ScriptDirectory from base to ``current_rev``, which raises
+``RevisionError`` in the actual downgrade case where ``current_rev`` is newer
+than anything the target image knows about. See
+https://github.com/apache/airflow/issues/68072.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import pathlib
+import types
+from unittest import mock
+
+import pytest
+from sqlalchemy.exc import OperationalError
+
+DB_MIGRATE_PATH = pathlib.Path(__file__).resolve().parents[3] / "files" /
"db_migrate.py"
+
+
[email protected](scope="module")
+def db_migrate():
+ """Load chart/files/db_migrate.py as a module.
+
+ The file is normally fed to ``python3 -c`` by the helm job rather than
+ imported, so it lives outside the chart's Python package tree.
+ """
+ spec = importlib.util.spec_from_file_location("chart_db_migrate",
DB_MIGRATE_PATH)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+# --------------------------------------------------------------------------
+# decide_action
+# --------------------------------------------------------------------------
+
+
[email protected]
+def patched_revision_map(db_migrate, monkeypatch):
+ """Install a stable {version: revision} mapping for decide_action tests."""
+ fake_map = {
+ "3.0.0": "rev_300",
+ "3.1.0": "rev_310",
+ "3.2.0": "rev_320",
+ }
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", fake_map)
+ return fake_map
+
+
+def _patch_engine_returning(db_migrate, monkeypatch, current_rev):
+ """Patch ``engine.connect()`` so MigrationContext.get_current_revision()
returns *current_rev*."""
+
+ class _Ctx:
+ def get_current_revision(self):
+ return current_rev
+
+ class _Conn:
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_):
+ return False
+
+ monkeypatch.setattr(db_migrate.engine, "connect", lambda: _Conn())
+ monkeypatch.setattr(
+ db_migrate.MigrationContext,
+ "configure",
+ staticmethod(lambda _conn: _Ctx()),
+ )
+
+
+def _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head):
+ """Patch ScriptDirectory.from_config so walk_revisions("base", X) yields
ancestors_by_head[X]."""
+
+ class _Rev:
+ def __init__(self, revision):
+ self.revision = revision
+
+ class _ScriptDir:
+ def walk_revisions(self, base, head):
+ assert base == "base"
+ return [_Rev(r) for r in ancestors_by_head[head]]
+
+ monkeypatch.setattr(db_migrate.ScriptDirectory, "from_config",
staticmethod(lambda _cfg: _ScriptDir()))
+
+
+def test_decide_action_unknown_target_falls_back_to_forward(db_migrate,
monkeypatch):
+ # Empty map: any target is unknown -> conservative forward migrate.
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", {})
+ assert db_migrate.decide_action("9.9.9") == "forward"
+
+
+def test_decide_action_fresh_when_db_unreachable(db_migrate, monkeypatch,
patched_revision_map):
+ def _raise(*_a, **_kw):
+ raise OperationalError("SELECT 1", {}, Exception("unreachable"))
+
+ monkeypatch.setattr(db_migrate.engine, "connect", _raise)
+ assert db_migrate.decide_action("3.1.0") == "fresh"
+
+
+def test_decide_action_fresh_when_no_alembic_row(db_migrate, monkeypatch,
patched_revision_map):
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev=None)
+ assert db_migrate.decide_action("3.1.0") == "fresh"
+
+
+def test_decide_action_noop_when_current_equals_target(db_migrate,
monkeypatch, patched_revision_map):
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_310")
+ assert db_migrate.decide_action("3.1.0") == "noop"
+
+
+def test_decide_action_forward_when_current_is_ancestor_of_target(
+ db_migrate, monkeypatch, patched_revision_map
+):
+ # current=3.0.0, target=3.1.0 -> rev_300 is in target's ancestor set.
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_300")
+ _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head={"rev_310":
["rev_300", "rev_310"]})
+ assert db_migrate.decide_action("3.1.0") == "forward"
+
+
+def test_decide_action_downgrade_when_current_not_in_target_ancestors(
+ db_migrate, monkeypatch, patched_revision_map
+):
+ """Regression test for the original blocker.
+
+ On a real downgrade, the TARGET image's ScriptDirectory does NOT contain
+ ``current_rev`` (current is newer than target). The previous implementation
+ called ``walk_revisions("base", current_rev)`` which raised
+ ``RevisionError`` and never reached the downgrade branch. The fix walks to
+ ``target_rev`` (always present in target's scripts) and checks membership.
+ """
+ # current=3.2.0 (newer), target=3.1.0 (older). rev_320 is unknown to
target.
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_320")
+ _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head={"rev_310":
["rev_300", "rev_310"]})
+ assert db_migrate.decide_action("3.1.0") == "downgrade"
+
+
+# --------------------------------------------------------------------------
+# discover_api_server_pod
+# --------------------------------------------------------------------------
+
+
+def _pod(name, ready=True):
+ pod = types.SimpleNamespace()
+ pod.metadata = types.SimpleNamespace(name=name)
+ pod.status = types.SimpleNamespace(
+ conditions=[types.SimpleNamespace(type="Ready", status="True" if ready
else "False")]
+ )
+ return pod
+
+
+def test_discover_api_server_pod_prefers_ready(db_migrate, monkeypatch):
+ fake_api = mock.MagicMock()
+ fake_api.list_namespaced_pod.return_value.items = [
+ _pod("api-server-old", ready=False),
+ _pod("api-server-new", ready=True),
+ ]
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda: fake_api)
+ assert db_migrate.discover_api_server_pod("airflow") == "api-server-new"
+ fake_api.list_namespaced_pod.assert_called_once_with(
+ namespace="airflow",
+ label_selector="component=api-server",
+ field_selector="status.phase=Running",
+ )
+
+
+def test_discover_api_server_pod_falls_back_to_non_ready(db_migrate,
monkeypatch):
+ fake_api = mock.MagicMock()
+ fake_api.list_namespaced_pod.return_value.items = [_pod("api-server-old",
ready=False)]
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda: fake_api)
+ assert db_migrate.discover_api_server_pod("airflow") == "api-server-old"
+
+
+def test_discover_api_server_pod_raises_when_none(db_migrate, monkeypatch):
+ fake_api = mock.MagicMock()
+ fake_api.list_namespaced_pod.return_value.items = []
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda: fake_api)
+ with pytest.raises(RuntimeError, match="no Running api-server pod"):
+ db_migrate.discover_api_server_pod("airflow")
+
+
+# --------------------------------------------------------------------------
+# run_downgrade_in_api_server
+# --------------------------------------------------------------------------
+
+
+def _fake_stream(returncode):
+ resp = mock.MagicMock()
+ # is_open(): True once so the loop body runs, then False to exit.
+ resp.is_open.side_effect = [True, False]
+ resp.peek_stdout.return_value = False
+ resp.peek_stderr.return_value = False
+ resp.returncode = returncode
+ return resp
+
+
+def test_run_downgrade_returns_zero_on_success(db_migrate, monkeypatch):
+ resp = _fake_stream(returncode=0)
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda:
mock.MagicMock())
+ monkeypatch.setattr(db_migrate, "stream", lambda *a, **kw: resp)
+ assert db_migrate.run_downgrade_in_api_server("p", "ns", "3.0.0") == 0
+
+
+def test_run_downgrade_propagates_nonzero(db_migrate, monkeypatch):
+ resp = _fake_stream(returncode=2)
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda:
mock.MagicMock())
+ monkeypatch.setattr(db_migrate, "stream", lambda *a, **kw: resp)
+ assert db_migrate.run_downgrade_in_api_server("p", "ns", "3.0.0") == 2
+
+
+def test_run_downgrade_treats_missing_returncode_as_failure(db_migrate,
monkeypatch):
+ # Regression: previous code did ``return returncode or 0`` which silently
+ # reported success when the stream closed without an exit code.
+ resp = _fake_stream(returncode=None)
+ monkeypatch.setattr(db_migrate.k8s_config, "load_incluster_config",
lambda: None)
+ monkeypatch.setattr(db_migrate.client, "CoreV1Api", lambda:
mock.MagicMock())
+ monkeypatch.setattr(db_migrate, "stream", lambda *a, **kw: resp)
+ assert db_migrate.run_downgrade_in_api_server("p", "ns", "3.0.0") == 1
+
+
+# --------------------------------------------------------------------------
+# scale_release_workloads_to_zero
+# --------------------------------------------------------------------------
+
+
+def _wl(name):
Review Comment:
Why not making this a fixture?
##########
chart/tests/helm_tests/airflow_aux/test_db_migrate_script.py:
##########
@@ -0,0 +1,305 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unit tests for the embedded bidirectional reconciler
``chart/files/db_migrate.py``.
+
+These exercise the runtime behaviour of the script itself (the helm template
+tests only check that the script is embedded with the right env/args/RBAC).
+The decide_action regression test exists because the first iteration walked
+the target image's ScriptDirectory from base to ``current_rev``, which raises
+``RevisionError`` in the actual downgrade case where ``current_rev`` is newer
+than anything the target image knows about. See
+https://github.com/apache/airflow/issues/68072.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import pathlib
+import types
+from unittest import mock
+
+import pytest
+from sqlalchemy.exc import OperationalError
+
+DB_MIGRATE_PATH = pathlib.Path(__file__).resolve().parents[3] / "files" /
"db_migrate.py"
+
+
[email protected](scope="module")
+def db_migrate():
+ """Load chart/files/db_migrate.py as a module.
+
+ The file is normally fed to ``python3 -c`` by the helm job rather than
+ imported, so it lives outside the chart's Python package tree.
+ """
+ spec = importlib.util.spec_from_file_location("chart_db_migrate",
DB_MIGRATE_PATH)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+# --------------------------------------------------------------------------
+# decide_action
+# --------------------------------------------------------------------------
+
+
[email protected]
+def patched_revision_map(db_migrate, monkeypatch):
+ """Install a stable {version: revision} mapping for decide_action tests."""
+ fake_map = {
+ "3.0.0": "rev_300",
+ "3.1.0": "rev_310",
+ "3.2.0": "rev_320",
+ }
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", fake_map)
+ return fake_map
+
+
+def _patch_engine_returning(db_migrate, monkeypatch, current_rev):
+ """Patch ``engine.connect()`` so MigrationContext.get_current_revision()
returns *current_rev*."""
+
+ class _Ctx:
+ def get_current_revision(self):
+ return current_rev
+
+ class _Conn:
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_):
+ return False
+
+ monkeypatch.setattr(db_migrate.engine, "connect", lambda: _Conn())
+ monkeypatch.setattr(
+ db_migrate.MigrationContext,
+ "configure",
+ staticmethod(lambda _conn: _Ctx()),
+ )
+
+
+def _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head):
Review Comment:
Why not making this a fixture?
##########
chart/tests/helm_tests/airflow_aux/test_db_migrate_script.py:
##########
@@ -0,0 +1,305 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unit tests for the embedded bidirectional reconciler
``chart/files/db_migrate.py``.
+
+These exercise the runtime behaviour of the script itself (the helm template
+tests only check that the script is embedded with the right env/args/RBAC).
+The decide_action regression test exists because the first iteration walked
+the target image's ScriptDirectory from base to ``current_rev``, which raises
+``RevisionError`` in the actual downgrade case where ``current_rev`` is newer
+than anything the target image knows about. See
+https://github.com/apache/airflow/issues/68072.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import pathlib
+import types
+from unittest import mock
+
+import pytest
+from sqlalchemy.exc import OperationalError
+
+DB_MIGRATE_PATH = pathlib.Path(__file__).resolve().parents[3] / "files" /
"db_migrate.py"
+
+
[email protected](scope="module")
+def db_migrate():
+ """Load chart/files/db_migrate.py as a module.
+
+ The file is normally fed to ``python3 -c`` by the helm job rather than
+ imported, so it lives outside the chart's Python package tree.
+ """
+ spec = importlib.util.spec_from_file_location("chart_db_migrate",
DB_MIGRATE_PATH)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+# --------------------------------------------------------------------------
+# decide_action
+# --------------------------------------------------------------------------
+
+
[email protected]
+def patched_revision_map(db_migrate, monkeypatch):
+ """Install a stable {version: revision} mapping for decide_action tests."""
+ fake_map = {
+ "3.0.0": "rev_300",
+ "3.1.0": "rev_310",
+ "3.2.0": "rev_320",
+ }
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", fake_map)
+ return fake_map
+
+
+def _patch_engine_returning(db_migrate, monkeypatch, current_rev):
+ """Patch ``engine.connect()`` so MigrationContext.get_current_revision()
returns *current_rev*."""
+
+ class _Ctx:
+ def get_current_revision(self):
+ return current_rev
+
+ class _Conn:
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_):
+ return False
+
+ monkeypatch.setattr(db_migrate.engine, "connect", lambda: _Conn())
+ monkeypatch.setattr(
+ db_migrate.MigrationContext,
+ "configure",
+ staticmethod(lambda _conn: _Ctx()),
+ )
+
+
+def _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head):
+ """Patch ScriptDirectory.from_config so walk_revisions("base", X) yields
ancestors_by_head[X]."""
+
+ class _Rev:
+ def __init__(self, revision):
+ self.revision = revision
+
+ class _ScriptDir:
+ def walk_revisions(self, base, head):
+ assert base == "base"
+ return [_Rev(r) for r in ancestors_by_head[head]]
+
+ monkeypatch.setattr(db_migrate.ScriptDirectory, "from_config",
staticmethod(lambda _cfg: _ScriptDir()))
+
+
+def test_decide_action_unknown_target_falls_back_to_forward(db_migrate,
monkeypatch):
+ # Empty map: any target is unknown -> conservative forward migrate.
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", {})
+ assert db_migrate.decide_action("9.9.9") == "forward"
+
+
+def test_decide_action_fresh_when_db_unreachable(db_migrate, monkeypatch,
patched_revision_map):
+ def _raise(*_a, **_kw):
+ raise OperationalError("SELECT 1", {}, Exception("unreachable"))
+
+ monkeypatch.setattr(db_migrate.engine, "connect", _raise)
+ assert db_migrate.decide_action("3.1.0") == "fresh"
+
+
+def test_decide_action_fresh_when_no_alembic_row(db_migrate, monkeypatch,
patched_revision_map):
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev=None)
+ assert db_migrate.decide_action("3.1.0") == "fresh"
+
+
+def test_decide_action_noop_when_current_equals_target(db_migrate,
monkeypatch, patched_revision_map):
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_310")
+ assert db_migrate.decide_action("3.1.0") == "noop"
+
+
+def test_decide_action_forward_when_current_is_ancestor_of_target(
+ db_migrate, monkeypatch, patched_revision_map
+):
+ # current=3.0.0, target=3.1.0 -> rev_300 is in target's ancestor set.
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_300")
+ _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head={"rev_310":
["rev_300", "rev_310"]})
+ assert db_migrate.decide_action("3.1.0") == "forward"
+
+
+def test_decide_action_downgrade_when_current_not_in_target_ancestors(
+ db_migrate, monkeypatch, patched_revision_map
+):
+ """Regression test for the original blocker.
+
+ On a real downgrade, the TARGET image's ScriptDirectory does NOT contain
+ ``current_rev`` (current is newer than target). The previous implementation
+ called ``walk_revisions("base", current_rev)`` which raised
+ ``RevisionError`` and never reached the downgrade branch. The fix walks to
+ ``target_rev`` (always present in target's scripts) and checks membership.
+ """
+ # current=3.2.0 (newer), target=3.1.0 (older). rev_320 is unknown to
target.
+ _patch_engine_returning(db_migrate, monkeypatch, current_rev="rev_320")
+ _patch_script_dir(db_migrate, monkeypatch, ancestors_by_head={"rev_310":
["rev_300", "rev_310"]})
+ assert db_migrate.decide_action("3.1.0") == "downgrade"
+
+
+# --------------------------------------------------------------------------
+# discover_api_server_pod
+# --------------------------------------------------------------------------
+
+
+def _pod(name, ready=True):
Review Comment:
Why not making this a fixture?
##########
chart/tests/helm_tests/airflow_aux/test_db_migrate_script.py:
##########
@@ -0,0 +1,305 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unit tests for the embedded bidirectional reconciler
``chart/files/db_migrate.py``.
+
+These exercise the runtime behaviour of the script itself (the helm template
+tests only check that the script is embedded with the right env/args/RBAC).
+The decide_action regression test exists because the first iteration walked
+the target image's ScriptDirectory from base to ``current_rev``, which raises
+``RevisionError`` in the actual downgrade case where ``current_rev`` is newer
+than anything the target image knows about. See
+https://github.com/apache/airflow/issues/68072.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import pathlib
+import types
+from unittest import mock
+
+import pytest
+from sqlalchemy.exc import OperationalError
+
+DB_MIGRATE_PATH = pathlib.Path(__file__).resolve().parents[3] / "files" /
"db_migrate.py"
+
+
[email protected](scope="module")
+def db_migrate():
+ """Load chart/files/db_migrate.py as a module.
+
+ The file is normally fed to ``python3 -c`` by the helm job rather than
+ imported, so it lives outside the chart's Python package tree.
+ """
+ spec = importlib.util.spec_from_file_location("chart_db_migrate",
DB_MIGRATE_PATH)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+# --------------------------------------------------------------------------
+# decide_action
+# --------------------------------------------------------------------------
+
+
[email protected]
+def patched_revision_map(db_migrate, monkeypatch):
+ """Install a stable {version: revision} mapping for decide_action tests."""
+ fake_map = {
+ "3.0.0": "rev_300",
+ "3.1.0": "rev_310",
+ "3.2.0": "rev_320",
+ }
+ monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", fake_map)
+ return fake_map
+
+
+def _patch_engine_returning(db_migrate, monkeypatch, current_rev):
Review Comment:
Why not making this a fixture?
##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current -> no-op (idempotent check)
+* target > current -> ``airflow db migrate`` inside this job's container
+ (uses the TARGET image, which ships forward scripts).
+* target < current -> ``airflow db downgrade --to-version <target>``
+ executed inside the still-running api-server pod (the OLD image still
+ ships the reverse scripts), followed by scaling every DB-touching
+ workload (api-server, scheduler, triggerer, dag-processor, worker) to
+ zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+ then patches those workloads back to ``replicas: N`` with the TARGET
+ image as the upgrade proceeds, so the cluster comes back up cleanly on
+ the target version. This means a downgrade trades the otherwise-broken
+ rolling-update window for a brief outage (which is unavoidable when the
+ schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+ only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+ """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+ target_rev = _REVISION_HEADS_MAP.get(target)
+ if target_rev is None:
+ # Unknown target version (e.g. dev build). Be conservative: forward
only.
+ return "forward"
+
+ try:
+ with engine.connect() as conn:
+ current_rev =
MigrationContext.configure(conn).get_current_revision()
+ except OperationalError:
+ # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+ # re-surface a real connectivity error with a clearer message.
+ return "fresh"
+
+ if current_rev is None:
+ return "fresh"
+ if current_rev == target_rev:
+ return "noop"
+
+ # Walk the TARGET image's revision graph from base to target_rev. Target's
+ # ScriptDirectory only contains revisions up to target's heads, so we
+ # cannot walk from an unknown current_rev (the downgrade case): the call
+ # would raise ``RevisionError``. Instead, build the ancestor set of target
+ # and check membership.
+ cfg = Config()
+ cfg.set_main_option(
+ "script_location",
+ os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+ )
+ script = ScriptDirectory.from_config(cfg)
+ ancestors_of_target = {rev.revision for rev in
script.walk_revisions("base", target_rev)}
+ if current_rev in ancestors_of_target:
+ # current is an ancestor of target -> moving forward.
+ return "forward"
+ # current is not in target's history -> must be newer than target.
+ return "downgrade"
+
+
+def discover_api_server_pod(namespace: str) -> str:
+ """Return the name of a Running api-server pod in *namespace*."""
+ k8s_config.load_incluster_config()
+ api = client.CoreV1Api()
+ pods = api.list_namespaced_pod(
+ namespace=namespace,
+ label_selector="component=api-server",
+ field_selector="status.phase=Running",
+ ).items
+ if not pods:
+ raise RuntimeError(f"no Running api-server pod found in namespace
{namespace}")
+
+ # Prefer Ready pods so we don't pick one mid-rollout.
+ ready = [
+ p for p in pods if any(c.type == "Ready" and c.status == "True" for c
in (p.status.conditions or []))
+ ]
+ return (ready or pods)[0].metadata.name
+
+
+def run_downgrade_in_api_server(pod_name: str, namespace: str, target_version:
str) -> int:
+ """Exec ``airflow db downgrade`` in the api-server pod via the Kubernetes
API."""
+ k8s_config.load_incluster_config()
+ api = client.CoreV1Api()
+ command = ["airflow", "db", "downgrade", "--to-version", target_version,
"--yes"]
+
+ resp = stream(
+ api.connect_get_namespaced_pod_exec,
+ pod_name,
+ namespace,
+ container="api-server",
+ command=command,
+ stderr=True,
+ stdin=False,
+ stdout=True,
+ tty=False,
+ _preload_content=False,
+ )
+ while resp.is_open():
+ resp.update(timeout=1)
+ if resp.peek_stdout():
+ sys.stdout.write(resp.read_stdout())
+ sys.stdout.flush()
+ if resp.peek_stderr():
+ sys.stderr.write(resp.read_stderr())
+ sys.stderr.flush()
+ returncode = resp.returncode
+ resp.close()
+ # Treat an unknown exit code as failure rather than success: if the stream
+ # closes without populating returncode we cannot prove the downgrade ran.
+ if returncode is None:
+ sys.stderr.write("[db_migrate] downgrade exec stream closed without an
exit code\n")
+ return 1
+ return returncode
+
+
+# Components that talk to the metadata DB and are managed by this helm release.
+# A downgrade-then-scale-back sequence must drain all of them so no OLD code
+# keeps talking to the now-downgraded schema before helm rolls in TARGET pods.
+_DB_TOUCHING_COMPONENTS = (
+ "api-server",
+ "scheduler",
+ "triggerer",
+ "dag-processor",
+ "worker",
+)
+
+
+def scale_release_workloads_to_zero(namespace: str, release_name: str,
timeout_seconds: int = 300) -> None:
+ """Scale all DB-touching workloads of this release to 0 and wait for drain.
+
+ Helm will patch the same Deployments/StatefulSets back to ``replicas: N``
+ when it applies the post-hook manifests, so we deliberately do NOT scale
+ them up again here.
+ """
+ k8s_config.load_incluster_config()
+ apps = client.AppsV1Api()
+ core = client.CoreV1Api()
+
+ selector = f"release={release_name},component in
({','.join(_DB_TOUCHING_COMPONENTS)})"
+ scale_body = {"spec": {"replicas": 0}}
+
+ deployments = apps.list_namespaced_deployment(namespace,
label_selector=selector).items
+ statefulsets = apps.list_namespaced_stateful_set(namespace,
label_selector=selector).items
+
+ for d in deployments:
+ print(f"[db_migrate] scaling Deployment/{d.metadata.name} to 0",
flush=True)
+ apps.patch_namespaced_deployment_scale(d.metadata.name, namespace,
scale_body)
+ for s in statefulsets:
+ print(f"[db_migrate] scaling StatefulSet/{s.metadata.name} to 0",
flush=True)
+ apps.patch_namespaced_stateful_set_scale(s.metadata.name, namespace,
scale_body)
+
+ deadline = time.monotonic() + timeout_seconds
+ while time.monotonic() < deadline:
+ remaining = core.list_namespaced_pod(namespace,
label_selector=selector).items
+ if not remaining:
+ print("[db_migrate] all DB-touching pods drained.", flush=True)
+ return
+ print(
+ f"[db_migrate] waiting for {len(remaining)} pod(s) to
terminate...",
+ flush=True,
+ )
+ time.sleep(2)
+ raise TimeoutError(f"DB-touching pods did not drain within
{timeout_seconds}s after scale-to-zero")
Review Comment:
I think it is fair and usual that worker Pods might have a longer job
running and might need to scale down for an hour. Can you make it possible to
set a higher timeout or ignore longer worker runtime and leave them running?
Whereas ... probably the workload "dies" anyway is api server is scaled-down,
then no heartbeats can be sent...
##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current -> no-op (idempotent check)
+* target > current -> ``airflow db migrate`` inside this job's container
+ (uses the TARGET image, which ships forward scripts).
+* target < current -> ``airflow db downgrade --to-version <target>``
+ executed inside the still-running api-server pod (the OLD image still
+ ships the reverse scripts), followed by scaling every DB-touching
+ workload (api-server, scheduler, triggerer, dag-processor, worker) to
+ zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+ then patches those workloads back to ``replicas: N`` with the TARGET
+ image as the upgrade proceeds, so the cluster comes back up cleanly on
+ the target version. This means a downgrade trades the otherwise-broken
+ rolling-update window for a brief outage (which is unavoidable when the
+ schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+ only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+ """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+ target_rev = _REVISION_HEADS_MAP.get(target)
+ if target_rev is None:
+ # Unknown target version (e.g. dev build). Be conservative: forward
only.
+ return "forward"
+
+ try:
+ with engine.connect() as conn:
+ current_rev =
MigrationContext.configure(conn).get_current_revision()
+ except OperationalError:
+ # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+ # re-surface a real connectivity error with a clearer message.
+ return "fresh"
+
+ if current_rev is None:
+ return "fresh"
+ if current_rev == target_rev:
+ return "noop"
+
+ # Walk the TARGET image's revision graph from base to target_rev. Target's
+ # ScriptDirectory only contains revisions up to target's heads, so we
+ # cannot walk from an unknown current_rev (the downgrade case): the call
+ # would raise ``RevisionError``. Instead, build the ancestor set of target
+ # and check membership.
+ cfg = Config()
+ cfg.set_main_option(
+ "script_location",
+ os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+ )
+ script = ScriptDirectory.from_config(cfg)
+ ancestors_of_target = {rev.revision for rev in
script.walk_revisions("base", target_rev)}
+ if current_rev in ancestors_of_target:
+ # current is an ancestor of target -> moving forward.
+ return "forward"
+ # current is not in target's history -> must be newer than target.
+ return "downgrade"
+
+
+def discover_api_server_pod(namespace: str) -> str:
+ """Return the name of a Running api-server pod in *namespace*."""
+ k8s_config.load_incluster_config()
+ api = client.CoreV1Api()
+ pods = api.list_namespaced_pod(
+ namespace=namespace,
+ label_selector="component=api-server",
+ field_selector="status.phase=Running",
+ ).items
+ if not pods:
+ raise RuntimeError(f"no Running api-server pod found in namespace
{namespace}")
+
+ # Prefer Ready pods so we don't pick one mid-rollout.
+ ready = [
+ p for p in pods if any(c.type == "Ready" and c.status == "True" for c
in (p.status.conditions or []))
+ ]
+ return (ready or pods)[0].metadata.name
+
+
+def run_downgrade_in_api_server(pod_name: str, namespace: str, target_version:
str) -> int:
Review Comment:
As API Server might be "rolled-over" can you add a `@retry()` decorator to
this for resiliency?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]