jykae commented on code in PR #68074:
URL: https://github.com/apache/airflow/pull/68074#discussion_r3387293319


##########
chart/templates/rbac/migrate-database-job-role.yaml:
##########
@@ -0,0 +1,85 @@
+{{/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/}}
+
+#######################################
+## Airflow Migrate Database Job Role
+##
+## Grants pods + pods/exec in the release namespace so the migrate-database-job
+## can exec ``airflow db downgrade`` inside the still-running api-server pod on
+## a chart downgrade. The forward-migrate branch never uses these permissions.
+#######################################
+{{- if and .Values.rbac.create .Values.migrateDatabaseJob.enabled }}
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+  name: {{ include "airflow.fullname" . }}-migrate-database-job-role
+  namespace: "{{ .Release.Namespace }}"
+  labels:
+    tier: airflow
+    component: run-airflow-migrations
+    release: {{ .Release.Name }}
+    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
+    heritage: {{ .Release.Service }}
+    {{- with .Values.labels }}
+      {{- toYaml . | nindent 4 }}
+    {{- end }}
+  {{- if .Values.migrateDatabaseJob.useHelmHooks }}
+  annotations:
+    helm.sh/hook: post-install,pre-upgrade
+    helm.sh/hook-weight: "-5"
+    helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded
+  {{- end }}
+rules:
+  - apiGroups:
+      - ""
+    resources:
+      - "pods"
+    verbs:
+      - "get"
+      - "list"
+  - apiGroups:
+      - ""
+    resources:
+      - "pods/exec"
+    # The reconciler execs via ``connect_get_namespaced_pod_exec`` (an HTTP GET
+    # to the exec subresource), so the API server authorizes it under the
+    # ``get`` verb -- not ``create`` (which is what a POST-based exec needs).
+    verbs:
+      - "get"

Review Comment:
   Good point on compatibility. Reverted to granting both `create` and `get` on 
`pods/exec` so the downgrade exec works regardless of whether the 
apiserver/client path authorizes it as a POST (`create`) or a GET-based upgrade 
(`get`). Updated the role comment and the RBAC test assertion accordingly.



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,355 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Optional env:
+
+* ``MIGRATE_JOB_DRAIN_TIMEOUT_SECONDS`` - how long to wait for DB-touching
+  pods to terminate after scale-to-zero on a downgrade. Defaults to 300.
+  Increase when long-running worker tasks need more time to wind down.
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.migration import MigrationContext
+from packaging.version import Version
+from sqlalchemy import text
+from sqlalchemy.exc import OperationalError
+from tenacity import (
+    retry,
+    retry_if_exception_type,
+    stop_after_attempt,
+    stop_after_delay,
+    wait_exponential,
+    wait_fixed,
+)
+
+from airflow.settings import engine
+
+# The kubernetes client is only used by the downgrade branch (exec ``airflow db
+# downgrade`` in the api-server pod, then scale DB-touching workloads to zero).
+# The base Airflow image does not ship the kubernetes client unless the
+# cncf.kubernetes provider is installed, so importing it unconditionally would
+# break the far more common forward / no-op / fresh paths. Import it lazily and
+# only fail when a downgrade is actually selected.
+try:
+    from kubernetes import client, config as k8s_config
+    from kubernetes.stream import stream
+
+    KUBERNETES_IMPORT_ERROR: ImportError | None = None
+except ImportError as exc:  # pragma: no cover - only on images without the 
k8s client
+    client = k8s_config = stream = None  # type: ignore[assignment]
+    KUBERNETES_IMPORT_ERROR = exc
+
+# ``_REVISION_HEADS_MAP`` is private today; a public accessor is being tracked
+# upstream so the chart can drop the leading underscore in a future release.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def _resolve_target_rev(target: str) -> str | None:

Review Comment:
   Fixed. `_resolve_target_rev` now wraps `Version(target)` in `try/except 
InvalidVersion` and returns `None` for an unparseable `AIRFLOW_TARGET_VERSION`, 
so `decide_action` treats it as an unknown target and falls back to a 
conservative forward migrate instead of crashing the job. Added 
`test_decide_action_unparseable_target_falls_back_to_forward` as a regression 
test.



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,355 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Optional env:
+
+* ``MIGRATE_JOB_DRAIN_TIMEOUT_SECONDS`` - how long to wait for DB-touching
+  pods to terminate after scale-to-zero on a downgrade. Defaults to 300.
+  Increase when long-running worker tasks need more time to wind down.
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.migration import MigrationContext
+from packaging.version import Version
+from sqlalchemy import text
+from sqlalchemy.exc import OperationalError
+from tenacity import (
+    retry,
+    retry_if_exception_type,
+    stop_after_attempt,
+    stop_after_delay,
+    wait_exponential,
+    wait_fixed,
+)
+
+from airflow.settings import engine
+
+# The kubernetes client is only used by the downgrade branch (exec ``airflow db
+# downgrade`` in the api-server pod, then scale DB-touching workloads to zero).
+# The base Airflow image does not ship the kubernetes client unless the
+# cncf.kubernetes provider is installed, so importing it unconditionally would
+# break the far more common forward / no-op / fresh paths. Import it lazily and
+# only fail when a downgrade is actually selected.
+try:
+    from kubernetes import client, config as k8s_config
+    from kubernetes.stream import stream
+
+    KUBERNETES_IMPORT_ERROR: ImportError | None = None
+except ImportError as exc:  # pragma: no cover - only on images without the 
k8s client
+    client = k8s_config = stream = None  # type: ignore[assignment]
+    KUBERNETES_IMPORT_ERROR = exc
+
+# ``_REVISION_HEADS_MAP`` is private today; a public accessor is being tracked
+# upstream so the chart can drop the leading underscore in a future release.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def _resolve_target_rev(target: str) -> str | None:
+    """Return the alembic head for *target*, falling back to the nearest lower 
mapped version.
+
+    ``_REVISION_HEADS_MAP`` does not list every patch release — patches often
+    share the head of their minor's first release. Mirror Airflow's own CLI
+    behaviour by picking the highest mapped version that is ``<= target``.
+    """
+    if target in _REVISION_HEADS_MAP:
+        return _REVISION_HEADS_MAP[target]
+    target_v = Version(target)
+    candidates = [(Version(v), rev) for v, rev in _REVISION_HEADS_MAP.items() 
if Version(v) <= target_v]

Review Comment:
   Same fix as the sibling comment: `_resolve_target_rev` now catches 
`InvalidVersion` and returns `None` (→ conservative forward) so a misconfigured 
`airflowVersion` no longer crashes the job. Covered by a new regression test.



##########
chart/values.schema.json:
##########
@@ -6731,19 +6731,21 @@
                     "default": null
                 },
                 "args": {
-                    "description": "Args to use when running migrate database 
job (templated).",
+                    "description": "Args to use when running migrate database 
job (templated). When unset, the chart embeds a script that reconciles the 
metadata DB bidirectionally (forward migrate, no-op, or downgrade via the 
still-running api-server pod).",

Review Comment:
   Updated the `migrateDatabaseJob.args` schema description to include the 
fresh-install path (no alembic row) and to clarify that an explicit list — 
including `[]` — overrides and disables the embedded reconciler, matching 
`values.yaml` and the newsfragment.



##########
chart/tests/helm_tests/airflow_aux/test_db_migrate_script.py:
##########
@@ -0,0 +1,511 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unit tests for the embedded bidirectional reconciler 
``chart/files/db_migrate.py``.
+
+These exercise the runtime behaviour of the script itself (the helm template
+tests only check that the script is embedded with the right env/args/RBAC).
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import pathlib
+import types
+from unittest import mock
+
+import pytest
+from kubernetes import client as k8s_client
+from sqlalchemy.exc import OperationalError
+
+DB_MIGRATE_PATH = pathlib.Path(__file__).resolve().parents[3] / "files" / 
"db_migrate.py"
+
+
[email protected](scope="module")
+def db_migrate():
+    """Load ``chart/files/db_migrate.py`` as a module.
+
+    The file is normally fed to ``python -c`` by the helm job rather than
+    imported, so it lives outside the chart's Python package tree.
+    """
+    spec = importlib.util.spec_from_file_location("chart_db_migrate", 
DB_MIGRATE_PATH)
+    module = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(module)
+    return module
+
+
+# --------------------------------------------------------------------------
+# Shared fixtures
+# --------------------------------------------------------------------------
+
+
[email protected]
+def patched_revision_map(db_migrate, monkeypatch):
+    """Install a stable ``{version: revision}`` mapping for ``decide_action`` 
tests."""
+    fake_map = {
+        "3.0.0": "rev_300",
+        "3.1.0": "rev_310",
+        "3.2.0": "rev_320",
+    }
+    monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", fake_map)
+    return fake_map
+
+
[email protected]
+def patch_engine_returning(db_migrate, monkeypatch):
+    """Factory: patch ``engine.connect()`` so ``get_current_revision()`` 
returns the given value."""
+
+    def _patch(current_rev):
+        class _Ctx:
+            def get_current_revision(self):
+                return current_rev
+
+        class _Conn:
+            def __enter__(self):
+                return self
+
+            def __exit__(self, *_):
+                return False
+
+        monkeypatch.setattr(db_migrate.engine, "connect", lambda: _Conn())
+        monkeypatch.setattr(
+            db_migrate.MigrationContext,
+            "configure",
+            staticmethod(lambda _conn: _Ctx()),
+        )
+
+    return _patch
+
+
[email protected]
+def make_pod():
+    """Factory: build a fake api-server pod with optional ``Ready`` 
condition."""
+
+    def _build(name, ready=True):
+        return types.SimpleNamespace(
+            metadata=types.SimpleNamespace(name=name),
+            status=types.SimpleNamespace(
+                conditions=[types.SimpleNamespace(type="Ready", status="True" 
if ready else "False")]
+            ),
+        )
+
+    return _build
+
+
[email protected]
+def fake_stream():
+    """Factory: build a fake kubernetes exec-stream response with the given 
exit code."""
+
+    def _build(returncode):
+        resp = mock.MagicMock()
+        # is_open(): True once so the loop body runs, then False to exit.
+        resp.is_open.side_effect = [True, False]
+        resp.peek_stdout.return_value = False
+        resp.peek_stderr.return_value = False
+        resp.returncode = returncode
+        return resp
+
+    return _build
+
+
[email protected]
+def make_workload():
+    """Factory: build a fake Deployment / StatefulSet object with a 
``metadata.name``."""
+
+    def _build(name):
+        return types.SimpleNamespace(metadata=types.SimpleNamespace(name=name))
+
+    return _build
+
+
+# --------------------------------------------------------------------------
+# decide_action
+# --------------------------------------------------------------------------
+
+
+def test_decide_action_unknown_target_falls_back_to_forward(db_migrate, 
monkeypatch):
+    # Empty map: any target is unknown -> conservative forward migrate.
+    monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", {})
+    assert db_migrate.decide_action("9.9.9") == "forward"
+
+
+def test_decide_action_propagates_db_error(db_migrate, monkeypatch, 
patched_revision_map):
+    """``decide_action`` assumes the DB is already reachable.
+
+    The wait is the responsibility of :func:`main` via
+    :func:`_wait_for_db_ready`; ``decide_action`` itself must surface a
+    connectivity failure rather than silently treating it as a fresh
+    install (which would then run ``airflow db migrate`` against a still-
+    unreachable DB and cause ``BackoffLimitExceeded``).
+    """
+
+    def _raise(*_a, **_kw):
+        raise OperationalError("SELECT 1", {}, Exception("unreachable"))
+
+    monkeypatch.setattr(db_migrate.engine, "connect", _raise)
+    with pytest.raises(OperationalError):
+        db_migrate.decide_action("3.1.0")
+
+
+def test_decide_action_fresh_when_no_alembic_row(patched_revision_map, 
db_migrate, patch_engine_returning):
+    patch_engine_returning(current_rev=None)
+    assert db_migrate.decide_action("3.1.0") == "fresh"
+
+
+def test_decide_action_noop_when_current_equals_target(
+    patched_revision_map, db_migrate, patch_engine_returning
+):
+    patch_engine_returning(current_rev="rev_310")
+    assert db_migrate.decide_action("3.1.0") == "noop"
+
+
+def test_decide_action_forward_when_current_is_older_than_target(
+    patched_revision_map, db_migrate, patch_engine_returning
+):
+    # current=3.0.0 (rev_300), target=3.1.0 (rev_310) -> forward.
+    patch_engine_returning(current_rev="rev_300")
+    assert db_migrate.decide_action("3.1.0") == "forward"
+
+
+def test_decide_action_downgrade_when_current_is_newer_than_target(
+    patched_revision_map, db_migrate, patch_engine_returning
+):
+    """Regression test for the original blocker.
+
+    On a real downgrade the TARGET image cannot resolve a revision newer than
+    its own head; relying on ``ScriptDirectory.walk_revisions`` raised
+    ``RevisionError`` and skipped the downgrade branch entirely. The new
+    implementation uses a version-aware reverse lookup so the comparison
+    works no matter which image is loaded.
+    """
+    # current=3.2.0 (rev_320, newer), target=3.1.0 (older) -> downgrade.
+    patch_engine_returning(current_rev="rev_320")
+    assert db_migrate.decide_action("3.1.0") == "downgrade"
+
+
+def test_decide_action_forward_when_current_revision_is_unknown(
+    patched_revision_map, db_migrate, patch_engine_returning
+):
+    """Dev / intermediate alembic rev -> conservative forward, never 
downgrade."""
+    patch_engine_returning(current_rev="rev_unknown")
+    assert db_migrate.decide_action("3.1.0") == "forward"
+
+
+def test_decide_action_resolves_patch_version_via_nearest_lower(
+    db_migrate, monkeypatch, patch_engine_returning
+):
+    """Regression test for the Copilot-flagged patch-version mapping bug.
+
+    ``_REVISION_HEADS_MAP`` does not list every patch (e.g. ``3.2.2`` is not
+    a key — its head is the same as ``3.2.0``'s). The resolver must fall back
+    to the highest lower mapped version so a deploy targeting a patch release
+    is correctly classified as a no-op or downgrade against the same rev.
+    """
+    monkeypatch.setattr(db_migrate, "_REVISION_HEADS_MAP", {"3.2.0": 
"rev_320"})
+    patch_engine_returning(current_rev="rev_320")
+    # 3.2.2 must resolve to rev_320 -> noop, not "forward" (the previous bug).
+    assert db_migrate.decide_action("3.2.2") == "noop"
+
+
+# --------------------------------------------------------------------------
+# discover_api_server_pod
+# --------------------------------------------------------------------------
+
+
+def _call_discover_no_retry(db_migrate, namespace):
+    """Invoke the underlying function, bypassing the @retry wrapper backoff."""
+    return db_migrate.discover_api_server_pod.retry_with(stop=lambda _r: 
True)(namespace)

Review Comment:
   Removed the unused `_call_discover_no_retry` helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to