Miretpl commented on code in PR #68074:
URL: https://github.com/apache/airflow/pull/68074#discussion_r3407864073


##########
chart/templates/rbac/migrate-database-job-role.yaml:
##########
@@ -0,0 +1,88 @@
+{{/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/}}
+
+#######################################
+## Airflow Migrate Database Job Role
+##
+## Grants pods + pods/exec in the release namespace so the migrate-database-job
+## can exec ``airflow db downgrade`` inside the still-running api-server pod on
+## a chart downgrade. The forward-migrate branch never uses these permissions.
+#######################################
+{{- if and .Values.rbac.create .Values.migrateDatabaseJob.enabled }}

Review Comment:
   This will be only needed when the proper args value will be set. Could you 
add the condition for that too? We should not generate things which are not 
needed.



##########
chart/values.yaml:
##########
@@ -2128,13 +2128,18 @@ migrateDatabaseJob:
   command: ~
 
   # Args to use when running the migrate database job (templated).
-  args:
-    - "bash"
-    - "-c"
-    - >-
-      exec \
+  # When unset (the default), the chart runs a bidirectional reconciliation
+  # script that picks one of fresh install, forward migrate, no-op, or exec
+  # downgrade against the running api-server, depending on whether the chart
+  # is being installed, upgraded, or downgraded. Set to an explicit list
+  # (including ``[]``) to override.
+  args: ~

Review Comment:
   Due to the discussion about potential instability of the feature, I would 
leave the default for being forward only, making sure that users are aware of 
the downsides of the current solution (we would need also doc regarding it).



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job.py:
##########
@@ -372,7 +444,13 @@ def test_command_and_args_overrides(self, command, args):
         )
 
         assert command == 
jmespath.search("spec.template.spec.containers[0].command", docs[0])
-        assert args == 
jmespath.search("spec.template.spec.containers[0].args", docs[0])
+        if args is None:
+            # When args is unset, the chart's bidirectional default kicks in
+            # (covered in detail by other tests in this class).
+            rendered_args = 
jmespath.search("spec.template.spec.containers[0].args", docs[0])
+            assert rendered_args[:2] == ["python", "-c"]
+        else:
+            assert args == 
jmespath.search("spec.template.spec.containers[0].args", docs[0])

Review Comment:
   I think that these should be two separate tests to make debugging easier and 
provide more clarity too.



##########
chart/templates/jobs/migrate-database-job-serviceaccount.yaml:
##########
@@ -35,8 +35,17 @@ metadata:
     {{- if or .Values.labels .Values.migrateDatabaseJob.labels }}
       {{- mustMerge .Values.migrateDatabaseJob.labels .Values.labels | toYaml 
| nindent 4 }}
     {{- end }}
+  {{- $annotations := dict }}
+  {{- if .Values.migrateDatabaseJob.useHelmHooks }}
+    {{- $_ := set $annotations "helm.sh/hook" "post-install,pre-upgrade" }}
+    {{- $_ := set $annotations "helm.sh/hook-weight" "-5" }}

Review Comment:
   ```suggestion
       {{- $_ := set $annotations "helm.sh/hook-weight" "0" }}
   ```
   Should be enough



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job.py:
##########
@@ -279,6 +279,77 @@ def test_should_set_correct_helm_hooks_weight(self):
         annotations = jmespath.search("metadata.annotations", docs[0])
         assert annotations["helm.sh/hook-weight"] == "1"
 
+    def test_default_hooks_run_post_install_pre_upgrade(self):
+        # post-install: the chart's bundled postgres is created as a regular
+        # release resource, so the job needs the DB to exist before it runs.
+        # pre-upgrade: the reconciler must run *before* helm rolls the new
+        # image, so on a downgrade it can exec ``airflow db downgrade`` in the
+        # still-running OLD api-server pod (the OLD image still ships the
+        # reverse alembic scripts; the new one does not).
+        # Tracked in https://github.com/apache/airflow/issues/68072.
+        docs = render_chart(
+            show_only=["templates/jobs/migrate-database-job.yaml"],
+        )
+        annotations = jmespath.search("metadata.annotations", docs[0])
+        assert annotations["helm.sh/hook"] == "post-install,pre-upgrade"
+
+    def test_default_args_contain_bidirectional_migrate_script(self):
+        docs = render_chart(
+            show_only=["templates/jobs/migrate-database-job.yaml"],
+        )
+        args = jmespath.search("spec.template.spec.containers[0].args", 
docs[0])
+        assert args[:2] == ["python", "-c"]
+        # Confirm the embedded script is the bidirectional reconciler.
+        assert "AIRFLOW_TARGET_VERSION" in args[2]

Review Comment:
   I understand it in the envs, why it is in args?



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,384 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Optional env:
+
+* ``MIGRATE_JOB_DRAIN_TIMEOUT_SECONDS`` - how long to wait for DB-touching
+  pods to terminate after scale-to-zero on a downgrade. Defaults to 300.
+  Increase when long-running worker tasks need more time to wind down.
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.migration import MigrationContext
+from packaging.version import InvalidVersion, Version
+from sqlalchemy import text
+from sqlalchemy.exc import OperationalError
+from tenacity import (
+    retry,
+    retry_if_exception_type,
+    stop_after_attempt,
+    stop_after_delay,
+    wait_exponential,
+    wait_fixed,
+)
+
+from airflow.settings import engine
+
+# The kubernetes client is only used by the downgrade branch (exec ``airflow db
+# downgrade`` in the api-server pod, then scale DB-touching workloads to zero).
+# The base Airflow image does not ship the kubernetes client unless the
+# cncf.kubernetes provider is installed, so importing it unconditionally would
+# break the far more common forward / no-op / fresh paths. Import it lazily and
+# only fail when a downgrade is actually selected.
+try:
+    from kubernetes import client, config as k8s_config
+    from kubernetes.stream import stream
+
+    KUBERNETES_IMPORT_ERROR: ImportError | None = None
+except ImportError as exc:  # pragma: no cover - only on images without the 
k8s client
+    client = k8s_config = stream = None  # type: ignore[assignment]
+    KUBERNETES_IMPORT_ERROR = exc
+
+# ``_REVISION_HEADS_MAP`` is private today; a public accessor is being tracked
+# upstream so the chart can drop the leading underscore in a future release.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def _int_env(name: str, default: int) -> int:
+    """Return integer env var *name*, or *default* when unset.
+
+    These knobs come from chart config; a typo (e.g. ``"5m"``) should fail with
+    an actionable message rather than an opaque ``ValueError`` traceback deep 
in
+    the job logs.
+    """
+    raw = os.environ.get(name)
+    if raw is None:
+        return default
+    try:
+        return int(raw)
+    except ValueError:
+        raise SystemExit(f"{name} must be an integer number of seconds, got 
{raw!r}")
+
+
+def _resolve_target_rev(target: str) -> str | None:
+    """Return the alembic head for *target*, falling back to the nearest lower 
mapped version.
+
+    ``_REVISION_HEADS_MAP`` does not list every patch release — patches often
+    share the head of their minor's first release. Mirror Airflow's own CLI
+    behaviour by picking the highest mapped version that is ``<= target``.
+    """
+    if target in _REVISION_HEADS_MAP:
+        return _REVISION_HEADS_MAP[target]
+    try:
+        target_v = Version(target)
+    except InvalidVersion:
+        # ``AIRFLOW_TARGET_VERSION`` comes from chart config; a nonstandard or
+        # misconfigured ``airflowVersion`` (e.g. a dev build) is not PEP 440
+        # parseable. Treat it as an unknown target so :func:`decide_action`
+        # falls back to a conservative forward migrate instead of crashing.
+        return None
+    candidates = [(Version(v), rev) for v, rev in _REVISION_HEADS_MAP.items() 
if Version(v) <= target_v]
+    if not candidates:
+        return None
+    return max(candidates, key=lambda pair: pair[0])[1]
+
+
+def _db_connect_stop(retry_state):
+    # Evaluate ``DB_CONNECT_MAX_WAIT_SECONDS`` at retry time (not import time)
+    # so that operators -- and unit tests -- can tune the wait window without
+    # reloading the module. ``/entrypoint`` skips its DB-wait for non-airflow
+    # commands (we run as ``python -c ...``), so on a fresh install with a
+    # bundled postgres still starting, the first connect attempt races the DB.
+    # Default 120s matches the entrypoint's
+    # ``CONNECTION_CHECK_MAX_COUNT`` * ``CONNECTION_CHECK_SLEEP_TIME``.
+    delay = _int_env("DB_CONNECT_MAX_WAIT_SECONDS", 120)
+    return stop_after_delay(delay)(retry_state)
+
+
+@retry(
+    stop=_db_connect_stop,
+    wait=wait_fixed(3),
+    retry=retry_if_exception_type(OperationalError),
+    reraise=True,
+)
+def _wait_for_db_ready() -> None:
+    """Block until the metadata DB accepts a connection.
+
+    Called once at the top of :func:`main` so that *every* downstream step
+    (``decide_action``, ``airflow db migrate`` subprocess, ``kubernetes``
+    api-server pod exec) can assume the DB is reachable. Without this, the
+    subprocess branch had no DB-wait of its own and would fail immediately
+    on a fresh install where the bundled postgres was still initialising,
+    causing ``BackoffLimitExceeded`` on the Job.
+    """
+    with engine.connect() as conn:
+        conn.execute(text("SELECT 1"))
+
+
+def decide_action(target: str) -> str:
+    """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``.
+
+    Assumes the DB is already reachable -- :func:`_wait_for_db_ready` must
+    have been called first.
+    """
+    target_rev = _resolve_target_rev(target)
+    if target_rev is None:
+        # Unknown target version (e.g. dev build). Be conservative: forward 
only.
+        return "forward"
+
+    with engine.connect() as conn:
+        current_rev = MigrationContext.configure(conn).get_current_revision()
+
+    if current_rev is None:
+        return "fresh"
+    if current_rev == target_rev:
+        return "noop"
+
+    # Reverse-lookup current_rev to determine which version it belongs to,
+    # then compare versions. If current_rev isn't mapped (dev/intermediate
+    # alembic revision) be conservative and forward-migrate rather than risk
+    # an incorrect downgrade.
+    rev_to_version = {rev: ver for ver, rev in _REVISION_HEADS_MAP.items()}
+    current_version = rev_to_version.get(current_rev)
+    if current_version is None:
+        return "forward"
+    if Version(current_version) > Version(target):
+        return "downgrade"
+    return "forward"
+
+
+@retry(
+    stop=stop_after_attempt(5),
+    wait=wait_exponential(multiplier=2, min=2, max=30),
+    retry=retry_if_exception_type(RuntimeError),
+    reraise=True,
+)
+def discover_api_server_pod(namespace: str, release_name: str) -> str:
+    """Return the name of a Running api-server pod for *release_name* in 
*namespace*.
+
+    Scoped to the release via the ``release`` label so that, when several
+    Airflow releases share a namespace, the downgrade is exec'd strictly in the
+    current release's api-server pod and never another release's.
+
+    Retries on ``RuntimeError`` so a rolling restart of the api-server
+    deployment (no Running pod for a few seconds) does not fail the job.
+    """
+    k8s_config.load_incluster_config()
+    api = client.CoreV1Api()
+    pods = api.list_namespaced_pod(
+        namespace=namespace,
+        label_selector=f"release={release_name},component=api-server",
+        field_selector="status.phase=Running",
+    ).items
+    if not pods:
+        raise RuntimeError(
+            f"no Running api-server pod found for release {release_name} in 
namespace {namespace}"
+        )
+
+    # Prefer Ready pods so we don't pick one mid-rollout.
+    ready = [
+        p for p in pods if any(c.type == "Ready" and c.status == "True" for c 
in (p.status.conditions or []))
+    ]
+    return (ready or pods)[0].metadata.name
+
+
+def run_downgrade_in_api_server(pod_name: str, namespace: str, target_version: 
str) -> int:
+    """Exec ``airflow db downgrade`` in the api-server pod via the Kubernetes 
API."""
+    k8s_config.load_incluster_config()
+    api = client.CoreV1Api()
+    command = ["airflow", "db", "downgrade", "--to-version", target_version, 
"--yes"]
+
+    resp = stream(
+        api.connect_get_namespaced_pod_exec,
+        pod_name,
+        namespace,
+        container="api-server",
+        command=command,
+        stderr=True,
+        stdin=False,
+        stdout=True,
+        tty=False,
+        _preload_content=False,
+    )
+    while resp.is_open():
+        resp.update(timeout=1)
+        if resp.peek_stdout():
+            sys.stdout.write(resp.read_stdout())
+            sys.stdout.flush()
+        if resp.peek_stderr():
+            sys.stderr.write(resp.read_stderr())
+            sys.stderr.flush()
+    returncode = resp.returncode
+    resp.close()
+    # Treat an unknown exit code as failure rather than success: if the stream
+    # closes without populating returncode we cannot prove the downgrade ran.
+    if returncode is None:
+        sys.stderr.write("[db_migrate] downgrade exec stream closed without an 
exit code\n")
+        return 1
+    return returncode
+
+
+# Components that talk to the metadata DB and are managed by this helm release.
+# A downgrade-then-scale-back sequence must drain all of them so no OLD code
+# keeps talking to the now-downgraded schema before helm rolls in TARGET pods.
+_DB_TOUCHING_COMPONENTS = (
+    "api-server",
+    "scheduler",
+    "triggerer",
+    "dag-processor",
+    "worker",
+)
+
+
+def scale_release_workloads_to_zero(
+    namespace: str, release_name: str, timeout_seconds: int | None = None
+) -> None:
+    """Scale all DB-touching workloads of this release to 0 and wait for drain.
+
+    Helm will patch the same Deployments/StatefulSets back to ``replicas: N``
+    when it applies the post-hook manifests, so we deliberately do NOT scale
+    them up again here.
+
+    The drain deadline defaults to ``MIGRATE_JOB_DRAIN_TIMEOUT_SECONDS`` (or
+    300s if unset). Operators with long-running worker tasks can raise this
+    via the ``migrateDatabaseJob.drainTimeoutSeconds`` chart value.
+    """
+    if timeout_seconds is None:
+        timeout_seconds = _int_env("MIGRATE_JOB_DRAIN_TIMEOUT_SECONDS", 300)
+    k8s_config.load_incluster_config()
+    apps = client.AppsV1Api()
+    core = client.CoreV1Api()
+
+    selector = f"release={release_name},component in 
({','.join(_DB_TOUCHING_COMPONENTS)})"
+    scale_body = {"spec": {"replicas": 0}}
+
+    deployments = apps.list_namespaced_deployment(namespace, 
label_selector=selector).items
+    statefulsets = apps.list_namespaced_stateful_set(namespace, 
label_selector=selector).items
+
+    for d in deployments:
+        print(f"[db_migrate] scaling Deployment/{d.metadata.name} to 0", 
flush=True)
+        apps.patch_namespaced_deployment_scale(d.metadata.name, namespace, 
scale_body)
+    for s in statefulsets:
+        print(f"[db_migrate] scaling StatefulSet/{s.metadata.name} to 0", 
flush=True)
+        apps.patch_namespaced_stateful_set_scale(s.metadata.name, namespace, 
scale_body)
+
+    deadline = time.monotonic() + timeout_seconds
+    while time.monotonic() < deadline:
+        remaining = core.list_namespaced_pod(namespace, 
label_selector=selector).items
+        if not remaining:
+            print("[db_migrate] all DB-touching pods drained.", flush=True)
+            return
+        print(
+            f"[db_migrate] waiting for {len(remaining)} pod(s) to 
terminate...",
+            flush=True,
+        )
+        time.sleep(2)
+    raise TimeoutError(f"DB-touching pods did not drain within 
{timeout_seconds}s after scale-to-zero")
+
+
+def _require_kubernetes_client() -> None:
+    """Fail clearly if the downgrade branch runs on an image without the k8s 
client.
+
+    The forward / no-op / fresh paths never import kubernetes, so only the
+    downgrade branch needs to assert it is available (see the guarded import
+    at the top of this module).
+    """
+    if KUBERNETES_IMPORT_ERROR is not None:
+        raise SystemExit(
+            "the chart downgrade path needs the 'kubernetes' python client, 
which is not "
+            "installed in this image. Install the cncf.kubernetes provider, or 
set "
+            "migrateDatabaseJob.args explicitly to supply your own migration 
command."
+        )
+
+
+def main() -> int:
+    target = os.environ.get("AIRFLOW_TARGET_VERSION")
+    namespace = os.environ.get("POD_NAMESPACE")
+    release_name = os.environ.get("RELEASE_NAME")
+    if not target:
+        raise SystemExit("AIRFLOW_TARGET_VERSION must be set")
+    if not namespace:
+        raise SystemExit("POD_NAMESPACE must be set")
+
+    # Block until the DB is reachable so that BOTH ``decide_action`` and the
+    # subsequent ``airflow db migrate`` subprocess (which does not go through
+    # ``/entrypoint`` and therefore has no DB-wait of its own) succeed on a
+    # fresh install where the bundled postgres may still be initialising.

Review Comment:
   Code should be self-explanatory, and the comments themselves should rather 
not tell what the code is doing but rather why it was written that way, not the 
other (e.g. some history context, edge case, etc.). This case is, at least for 
me, self-explanatory as to why we need to wait for the DB to be ready. I think 
that most of the comments in this file are just not needed and are the noise 
generated by AI, which makes, from my perspective, reviewing harder, cause I 
actually read the same thing over and over again. Due to that, I didn't finish 
reviewing this file - I will do it in the next iteration.
   
   Could you please review all of the comments in this PR in terms of whether 
they are actually bringing any value which cannot be taken from the actual code?



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job.py:
##########
@@ -279,6 +279,77 @@ def test_should_set_correct_helm_hooks_weight(self):
         annotations = jmespath.search("metadata.annotations", docs[0])
         assert annotations["helm.sh/hook-weight"] == "1"
 
+    def test_default_hooks_run_post_install_pre_upgrade(self):
+        # post-install: the chart's bundled postgres is created as a regular
+        # release resource, so the job needs the DB to exist before it runs.
+        # pre-upgrade: the reconciler must run *before* helm rolls the new
+        # image, so on a downgrade it can exec ``airflow db downgrade`` in the
+        # still-running OLD api-server pod (the OLD image still ships the
+        # reverse alembic scripts; the new one does not).
+        # Tracked in https://github.com/apache/airflow/issues/68072.
+        docs = render_chart(
+            show_only=["templates/jobs/migrate-database-job.yaml"],
+        )
+        annotations = jmespath.search("metadata.annotations", docs[0])
+        assert annotations["helm.sh/hook"] == "post-install,pre-upgrade"
+
+    def test_default_args_contain_bidirectional_migrate_script(self):
+        docs = render_chart(
+            show_only=["templates/jobs/migrate-database-job.yaml"],
+        )
+        args = jmespath.search("spec.template.spec.containers[0].args", 
docs[0])
+        assert args[:2] == ["python", "-c"]
+        # Confirm the embedded script is the bidirectional reconciler.
+        assert "AIRFLOW_TARGET_VERSION" in args[2]
+        assert "airflow db downgrade" in args[2]
+        assert "airflow db migrate" in args[2]
+
+    def test_user_supplied_args_still_win(self):
+        custom_args = ["bash", "-c", "exec airflow db migrate"]
+        docs = render_chart(
+            values={"migrateDatabaseJob": {"args": custom_args}},
+            show_only=["templates/jobs/migrate-database-job.yaml"],
+        )
+        assert jmespath.search("spec.template.spec.containers[0].args", 
docs[0]) == custom_args
+
+    def test_user_supplied_command_still_wins(self):
+        custom_command = ["/bin/my-entrypoint"]
+        docs = render_chart(
+            values={"migrateDatabaseJob": {"command": custom_command}},
+            show_only=["templates/jobs/migrate-database-job.yaml"],
+        )
+        assert jmespath.search("spec.template.spec.containers[0].command", 
docs[0]) == custom_command
+
+    def test_airflow_target_version_env_uses_airflow_version(self):
+        docs = render_chart(
+            values={"airflowVersion": "3.1.8"},
+            show_only=["templates/jobs/migrate-database-job.yaml"],
+        )
+        env = jmespath.search("spec.template.spec.containers[0].env", docs[0])
+        target_var = next((e for e in env if e["name"] == 
"AIRFLOW_TARGET_VERSION"), None)
+        assert target_var is not None
+        assert target_var["value"] == "3.1.8"

Review Comment:
   ```suggestion
           assert 
jmespath.search("spec.template.spec.containers[0].env[?name=='AIRFLOW_TARGET_VERSION']
 | [0].value", docs[0]) == "3.1.8"
   ```
   If I remember notation correctly.
   
   *the same applies to the other tests*



##########
chart/templates/rbac/migrate-database-job-rolebinding.yaml:
##########
@@ -0,0 +1,52 @@
+{{/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/}}
+
+##############################################
+## Airflow Migrate Database Job Role Binding
+##############################################
+{{- if and .Values.rbac.create .Values.migrateDatabaseJob.enabled }}

Review Comment:
   Same comment as for the roles.



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job.py:
##########
@@ -352,16 +423,17 @@ def test_job_ttl_after_finished_nil(self):
         assert "ttlSecondsAfterFinished" not in spec
 
     def test_default_command_and_args_airflow_version(self):
+        # Default args now embed the bidirectional reconciliation script.
+        # The "args still win" / "command still wins" / "script body present"

Review Comment:
   What does it mean?



##########
chart/tests/helm_tests/airflow_aux/test_db_migrate_script.py:
##########
@@ -0,0 +1,554 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Could you please review this file in terms of consistency with the whole 
Helm chart test suite?



##########
chart/templates/rbac/migrate-database-job-rolebinding.yaml:
##########
@@ -0,0 +1,52 @@
+{{/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/}}
+
+##############################################
+## Airflow Migrate Database Job Role Binding
+##############################################
+{{- if and .Values.rbac.create .Values.migrateDatabaseJob.enabled }}
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+  name: {{ include "airflow.fullname" . }}-migrate-database-job-rolebinding
+  namespace: "{{ .Release.Namespace }}"
+  labels:
+    tier: airflow
+    component: run-airflow-migrations
+    release: {{ .Release.Name }}
+    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
+    heritage: {{ .Release.Service }}
+    {{- with .Values.labels }}
+      {{- toYaml . | nindent 4 }}
+    {{- end }}
+  {{- if .Values.migrateDatabaseJob.useHelmHooks }}
+  annotations:
+    helm.sh/hook: post-install,pre-upgrade
+    helm.sh/hook-weight: "-5"

Review Comment:
   ```suggestion
       helm.sh/hook-weight: "0"
   ```



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job.py:
##########
@@ -352,16 +423,17 @@ def test_job_ttl_after_finished_nil(self):
         assert "ttlSecondsAfterFinished" not in spec
 
     def test_default_command_and_args_airflow_version(self):
+        # Default args now embed the bidirectional reconciliation script.
+        # The "args still win" / "command still wins" / "script body present"
+        # cases above cover the override semantics in detail.
         docs = render_chart(
             show_only=["templates/jobs/migrate-database-job.yaml"],
         )
 
         assert jmespath.search("spec.template.spec.containers[0].command", 
docs[0]) is None
-        assert jmespath.search("spec.template.spec.containers[0].args", 
docs[0]) == [
-            "bash",
-            "-c",
-            "exec \\\nairflow db migrate",
-        ]
+        args = jmespath.search("spec.template.spec.containers[0].args", 
docs[0])
+        assert args[:2] == ["python", "-c"]
+        assert "airflow db migrate" in args[2]

Review Comment:
   I think it is better to test the whole content of the command. Without that, 
we can miss something in the future when there will be some change.



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job.py:
##########
@@ -279,6 +279,77 @@ def test_should_set_correct_helm_hooks_weight(self):
         annotations = jmespath.search("metadata.annotations", docs[0])
         assert annotations["helm.sh/hook-weight"] == "1"
 
+    def test_default_hooks_run_post_install_pre_upgrade(self):
+        # post-install: the chart's bundled postgres is created as a regular
+        # release resource, so the job needs the DB to exist before it runs.
+        # pre-upgrade: the reconciler must run *before* helm rolls the new
+        # image, so on a downgrade it can exec ``airflow db downgrade`` in the
+        # still-running OLD api-server pod (the OLD image still ships the
+        # reverse alembic scripts; the new one does not).
+        # Tracked in https://github.com/apache/airflow/issues/68072.

Review Comment:
   ```suggestion
           # still-running OLD api-server pod.
   ```
   I think that this should be clear for anyone working with Airflow and Helm



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job_rbac.py:
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Should be under the `security` test set, not `airflow_aux`, to keep 
consistency within the tests.



##########
chart/newsfragments/68074.significant.rst:
##########
@@ -0,0 +1,42 @@
+Helm chart now reconciles the Airflow metadata DB bidirectionally on ``helm 
upgrade``
+
+The ``migrate-database-job`` is now run as a ``post-install,pre-upgrade`` hook
+(was ``post-install,post-upgrade``) and its default args run a reconciliation
+script that picks one of four actions depending on the relationship between
+the chart's ``airflowVersion`` and the alembic head currently in the database:
+
+* ``fresh install`` (no alembic row) — ``airflow db migrate``.
+* ``target == current`` — no-op.
+* ``target > current`` — ``airflow db migrate`` (existing behaviour).
+* ``target < current`` — ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (so reverse alembic
+  scripts that ship only in the source-version image are available),
+  followed by scaling every DB-touching workload of this release
+  (``api-server``, ``scheduler``, ``triggerer``, ``dag-processor``,
+  ``worker``) to ``0``. Helm then patches those workloads back to their
+  configured ``replicas`` with the TARGET image as the upgrade proceeds,
+  so the cluster comes back up cleanly on the target version without any
+  OLD code running against the now-downgraded schema. A downgrade is
+  therefore a brief outage rather than a zero-downtime rolling update —
+  this is unavoidable when the schema goes backwards.
+
+The action above is only decided once the metadata DB is reachable. If the DB
+is still unreachable after the connection-wait window
+(``DB_CONNECT_MAX_WAIT_SECONDS``, default ``120``), the job exits non-zero and
+is retried via the Job ``backoffLimit`` rather than proceeding with a migrate.
+
+A ``Role`` + ``RoleBinding`` granting ``pods`` (``get``, ``list``),
+``pods/exec`` (``get``, ``create``), ``deployments`` / ``statefulsets`` 
(``get``, ``list``)
+and ``deployments/scale`` / ``statefulsets/scale`` (``patch``) to the
+existing ``migrate-database-job`` ``ServiceAccount`` are now rendered when

Review Comment:
   Implementation details which could be rather documentation than part of the 
release notes.



##########
chart/templates/rbac/migrate-database-job-role.yaml:
##########
@@ -0,0 +1,88 @@
+{{/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/}}
+
+#######################################
+## Airflow Migrate Database Job Role
+##
+## Grants pods + pods/exec in the release namespace so the migrate-database-job
+## can exec ``airflow db downgrade`` inside the still-running api-server pod on
+## a chart downgrade. The forward-migrate branch never uses these permissions.
+#######################################
+{{- if and .Values.rbac.create .Values.migrateDatabaseJob.enabled }}
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+  name: {{ include "airflow.fullname" . }}-migrate-database-job-role
+  namespace: "{{ .Release.Namespace }}"
+  labels:
+    tier: airflow
+    component: run-airflow-migrations
+    release: {{ .Release.Name }}
+    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
+    heritage: {{ .Release.Service }}
+    {{- with .Values.labels }}
+      {{- toYaml . | nindent 4 }}
+    {{- end }}
+  {{- if .Values.migrateDatabaseJob.useHelmHooks }}
+  annotations:
+    helm.sh/hook: post-install,pre-upgrade
+    helm.sh/hook-weight: "-5"

Review Comment:
   ```suggestion
       helm.sh/hook-weight: "0"
   ```



##########
chart/tests/helm_tests/airflow_aux/test_migrate_database_job_rbac.py:
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Could you please review this file in terms of consistency and test 
duplication within the whole Helm chart test suite?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to