jykae commented on code in PR #68074:
URL: https://github.com/apache/airflow/pull/68074#discussion_r3371310814


##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072

Review Comment:
   Done. Dropped the issue reference in the docstring.



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.

Review Comment:
   Good idea — I'll open the follow-up PR to expose a public accessor and 
update this script to prefer it once it's available.



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+    """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+    target_rev = _REVISION_HEADS_MAP.get(target)
+    if target_rev is None:
+        # Unknown target version (e.g. dev build). Be conservative: forward 
only.
+        return "forward"
+
+    try:
+        with engine.connect() as conn:
+            current_rev = 
MigrationContext.configure(conn).get_current_revision()
+    except OperationalError:
+        # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+        # re-surface a real connectivity error with a clearer message.
+        return "fresh"
+
+    if current_rev is None:
+        return "fresh"
+    if current_rev == target_rev:
+        return "noop"
+
+    # Walk the TARGET image's revision graph from base to target_rev. Target's
+    # ScriptDirectory only contains revisions up to target's heads, so we
+    # cannot walk from an unknown current_rev (the downgrade case): the call
+    # would raise ``RevisionError``. Instead, build the ancestor set of target
+    # and check membership.
+    cfg = Config()
+    cfg.set_main_option(
+        "script_location",
+        os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+    )
+    script = ScriptDirectory.from_config(cfg)
+    ancestors_of_target = {rev.revision for rev in 
script.walk_revisions("base", target_rev)}
+    if current_rev in ancestors_of_target:

Review Comment:
   Good catch. Switched to a `_REVISION_HEADS_MAP` reverse lookup with a 
`packaging.version.Version` comparison. If the current revision isn't in the 
map (dev / intermediate alembic rev) we conservatively forward-migrate rather 
than risk an incorrect downgrade. Dropped the `ScriptDirectory` dependency 
entirely.



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+    """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+    target_rev = _REVISION_HEADS_MAP.get(target)
+    if target_rev is None:
+        # Unknown target version (e.g. dev build). Be conservative: forward 
only.
+        return "forward"
+
+    try:
+        with engine.connect() as conn:
+            current_rev = 
MigrationContext.configure(conn).get_current_revision()
+    except OperationalError:
+        # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+        # re-surface a real connectivity error with a clearer message.
+        return "fresh"
+
+    if current_rev is None:
+        return "fresh"
+    if current_rev == target_rev:
+        return "noop"
+
+    # Walk the TARGET image's revision graph from base to target_rev. Target's
+    # ScriptDirectory only contains revisions up to target's heads, so we
+    # cannot walk from an unknown current_rev (the downgrade case): the call
+    # would raise ``RevisionError``. Instead, build the ancestor set of target
+    # and check membership.
+    cfg = Config()
+    cfg.set_main_option(
+        "script_location",
+        os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+    )
+    script = ScriptDirectory.from_config(cfg)
+    ancestors_of_target = {rev.revision for rev in 
script.walk_revisions("base", target_rev)}
+    if current_rev in ancestors_of_target:
+        # current is an ancestor of target -> moving forward.
+        return "forward"
+    # current is not in target's history -> must be newer than target.
+    return "downgrade"
+
+
+def discover_api_server_pod(namespace: str) -> str:
+    """Return the name of a Running api-server pod in *namespace*."""
+    k8s_config.load_incluster_config()
+    api = client.CoreV1Api()
+    pods = api.list_namespaced_pod(
+        namespace=namespace,
+        label_selector="component=api-server",
+        field_selector="status.phase=Running",
+    ).items
+    if not pods:
+        raise RuntimeError(f"no Running api-server pod found in namespace 
{namespace}")
+
+    # Prefer Ready pods so we don't pick one mid-rollout.
+    ready = [
+        p for p in pods if any(c.type == "Ready" and c.status == "True" for c 
in (p.status.conditions or []))
+    ]
+    return (ready or pods)[0].metadata.name

Review Comment:
   The list comprehension is followed by `(ready or pods)[0]`, and we already 
raised `RuntimeError("no Running api-server pod...")` above if `pods` was 
empty, so the slice can't OOB. With the new `@retry` decorator a transient 
empty result is also retried with backoff.



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+    """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+    target_rev = _REVISION_HEADS_MAP.get(target)
+    if target_rev is None:
+        # Unknown target version (e.g. dev build). Be conservative: forward 
only.
+        return "forward"
+
+    try:
+        with engine.connect() as conn:
+            current_rev = 
MigrationContext.configure(conn).get_current_revision()
+    except OperationalError:
+        # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+        # re-surface a real connectivity error with a clearer message.
+        return "fresh"
+
+    if current_rev is None:
+        return "fresh"
+    if current_rev == target_rev:
+        return "noop"
+
+    # Walk the TARGET image's revision graph from base to target_rev. Target's
+    # ScriptDirectory only contains revisions up to target's heads, so we
+    # cannot walk from an unknown current_rev (the downgrade case): the call
+    # would raise ``RevisionError``. Instead, build the ancestor set of target
+    # and check membership.
+    cfg = Config()
+    cfg.set_main_option(
+        "script_location",
+        os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+    )
+    script = ScriptDirectory.from_config(cfg)
+    ancestors_of_target = {rev.revision for rev in 
script.walk_revisions("base", target_rev)}
+    if current_rev in ancestors_of_target:
+        # current is an ancestor of target -> moving forward.
+        return "forward"
+    # current is not in target's history -> must be newer than target.
+    return "downgrade"
+
+
+def discover_api_server_pod(namespace: str) -> str:
+    """Return the name of a Running api-server pod in *namespace*."""
+    k8s_config.load_incluster_config()
+    api = client.CoreV1Api()
+    pods = api.list_namespaced_pod(
+        namespace=namespace,
+        label_selector="component=api-server",
+        field_selector="status.phase=Running",
+    ).items
+    if not pods:
+        raise RuntimeError(f"no Running api-server pod found in namespace 
{namespace}")
+
+    # Prefer Ready pods so we don't pick one mid-rollout.
+    ready = [
+        p for p in pods if any(c.type == "Ready" and c.status == "True" for c 
in (p.status.conditions or []))
+    ]
+    return (ready or pods)[0].metadata.name
+
+
+def run_downgrade_in_api_server(pod_name: str, namespace: str, target_version: 
str) -> int:

Review Comment:
   Added `@retry(stop=stop_after_attempt(5), wait=wait_exponential(...), 
retry=retry_if_exception_type(RuntimeError), reraise=True)`. Added a regression 
test that exercises the transient-empty case.



##########
chart/files/db_migrate.py:
##########
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Bidirectional Airflow metadata DB reconciliation for the helm chart.
+
+Decides at runtime whether the helm release wants a forward migrate, a
+downgrade, or a no-op, and runs the right command:
+
+* target == current  -> no-op (idempotent check)
+* target  > current  -> ``airflow db migrate`` inside this job's container
+  (uses the TARGET image, which ships forward scripts).
+* target  < current  -> ``airflow db downgrade --to-version <target>``
+  executed inside the still-running api-server pod (the OLD image still
+  ships the reverse scripts), followed by scaling every DB-touching
+  workload (api-server, scheduler, triggerer, dag-processor, worker) to
+  zero so that no OLD pod keeps talking to the now-downgraded schema. Helm
+  then patches those workloads back to ``replicas: N`` with the TARGET
+  image as the upgrade proceeds, so the cluster comes back up cleanly on
+  the target version. This means a downgrade trades the otherwise-broken
+  rolling-update window for a brief outage (which is unavoidable when the
+  schema goes backwards).
+
+Required env:
+
+* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being 
upgraded/installed to.
+* ``POD_NAMESPACE`` - release namespace, injected via downward API.
+* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to
+  only the workloads owned by this release.
+
+Reference: https://github.com/apache/airflow/issues/68072
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+from alembic.config import Config
+from alembic.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from kubernetes import client, config as k8s_config
+from kubernetes.stream import stream
+from sqlalchemy.exc import OperationalError
+
+import airflow
+from airflow.settings import engine
+
+# NOTE: _REVISION_HEADS_MAP is a private symbol in airflow.utils.db. Tracked in
+# #68072 to expose a public accessor; using the private name is the only way
+# today to map a target version string to an alembic revision.
+from airflow.utils.db import _REVISION_HEADS_MAP
+
+
+def decide_action(target: str) -> str:
+    """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``."""
+    target_rev = _REVISION_HEADS_MAP.get(target)
+    if target_rev is None:
+        # Unknown target version (e.g. dev build). Be conservative: forward 
only.
+        return "forward"
+
+    try:
+        with engine.connect() as conn:
+            current_rev = 
MigrationContext.configure(conn).get_current_revision()
+    except OperationalError:
+        # DB unreachable -> treat as fresh install; ``airflow db migrate`` will
+        # re-surface a real connectivity error with a clearer message.
+        return "fresh"
+
+    if current_rev is None:
+        return "fresh"
+    if current_rev == target_rev:
+        return "noop"
+
+    # Walk the TARGET image's revision graph from base to target_rev. Target's
+    # ScriptDirectory only contains revisions up to target's heads, so we
+    # cannot walk from an unknown current_rev (the downgrade case): the call
+    # would raise ``RevisionError``. Instead, build the ancestor set of target
+    # and check membership.
+    cfg = Config()
+    cfg.set_main_option(
+        "script_location",
+        os.path.join(os.path.dirname(airflow.__file__), "migrations"),
+    )
+    script = ScriptDirectory.from_config(cfg)
+    ancestors_of_target = {rev.revision for rev in 
script.walk_revisions("base", target_rev)}
+    if current_rev in ancestors_of_target:
+        # current is an ancestor of target -> moving forward.
+        return "forward"
+    # current is not in target's history -> must be newer than target.
+    return "downgrade"
+
+
+def discover_api_server_pod(namespace: str) -> str:
+    """Return the name of a Running api-server pod in *namespace*."""
+    k8s_config.load_incluster_config()
+    api = client.CoreV1Api()
+    pods = api.list_namespaced_pod(
+        namespace=namespace,
+        label_selector="component=api-server",
+        field_selector="status.phase=Running",
+    ).items
+    if not pods:
+        raise RuntimeError(f"no Running api-server pod found in namespace 
{namespace}")
+
+    # Prefer Ready pods so we don't pick one mid-rollout.
+    ready = [
+        p for p in pods if any(c.type == "Ready" and c.status == "True" for c 
in (p.status.conditions or []))
+    ]
+    return (ready or pods)[0].metadata.name
+
+
+def run_downgrade_in_api_server(pod_name: str, namespace: str, target_version: 
str) -> int:
+    """Exec ``airflow db downgrade`` in the api-server pod via the Kubernetes 
API."""
+    k8s_config.load_incluster_config()
+    api = client.CoreV1Api()
+    command = ["airflow", "db", "downgrade", "--to-version", target_version, 
"--yes"]
+
+    resp = stream(
+        api.connect_get_namespaced_pod_exec,
+        pod_name,
+        namespace,
+        container="api-server",
+        command=command,
+        stderr=True,
+        stdin=False,
+        stdout=True,
+        tty=False,
+        _preload_content=False,
+    )
+    while resp.is_open():
+        resp.update(timeout=1)
+        if resp.peek_stdout():
+            sys.stdout.write(resp.read_stdout())
+            sys.stdout.flush()
+        if resp.peek_stderr():
+            sys.stderr.write(resp.read_stderr())
+            sys.stderr.flush()
+    returncode = resp.returncode
+    resp.close()
+    # Treat an unknown exit code as failure rather than success: if the stream
+    # closes without populating returncode we cannot prove the downgrade ran.
+    if returncode is None:
+        sys.stderr.write("[db_migrate] downgrade exec stream closed without an 
exit code\n")
+        return 1
+    return returncode
+
+
+# Components that talk to the metadata DB and are managed by this helm release.
+# A downgrade-then-scale-back sequence must drain all of them so no OLD code
+# keeps talking to the now-downgraded schema before helm rolls in TARGET pods.
+_DB_TOUCHING_COMPONENTS = (
+    "api-server",
+    "scheduler",
+    "triggerer",
+    "dag-processor",
+    "worker",
+)
+
+
+def scale_release_workloads_to_zero(namespace: str, release_name: str, 
timeout_seconds: int = 300) -> None:
+    """Scale all DB-touching workloads of this release to 0 and wait for drain.
+
+    Helm will patch the same Deployments/StatefulSets back to ``replicas: N``
+    when it applies the post-hook manifests, so we deliberately do NOT scale
+    them up again here.
+    """
+    k8s_config.load_incluster_config()
+    apps = client.AppsV1Api()
+    core = client.CoreV1Api()
+
+    selector = f"release={release_name},component in 
({','.join(_DB_TOUCHING_COMPONENTS)})"
+    scale_body = {"spec": {"replicas": 0}}
+
+    deployments = apps.list_namespaced_deployment(namespace, 
label_selector=selector).items
+    statefulsets = apps.list_namespaced_stateful_set(namespace, 
label_selector=selector).items
+
+    for d in deployments:
+        print(f"[db_migrate] scaling Deployment/{d.metadata.name} to 0", 
flush=True)
+        apps.patch_namespaced_deployment_scale(d.metadata.name, namespace, 
scale_body)
+    for s in statefulsets:
+        print(f"[db_migrate] scaling StatefulSet/{s.metadata.name} to 0", 
flush=True)
+        apps.patch_namespaced_stateful_set_scale(s.metadata.name, namespace, 
scale_body)
+
+    deadline = time.monotonic() + timeout_seconds
+    while time.monotonic() < deadline:
+        remaining = core.list_namespaced_pod(namespace, 
label_selector=selector).items
+        if not remaining:
+            print("[db_migrate] all DB-touching pods drained.", flush=True)
+            return
+        print(
+            f"[db_migrate] waiting for {len(remaining)} pod(s) to 
terminate...",
+            flush=True,
+        )
+        time.sleep(2)
+    raise TimeoutError(f"DB-touching pods did not drain within 
{timeout_seconds}s after scale-to-zero")

Review Comment:
   Made it configurable via `migrateDatabaseJob.drainTimeoutSeconds` (default 
300) plumbed in as the `MIGRATE_JOB_DRAIN_TIMEOUT_SECONDS` env var. Operators 
with long worker tasks can raise it. Agree that workers will likely die once 
the api-server is scaled down, but the value gives a clean knob to tune.



##########
chart/templates/jobs/migrate-database-job.yaml:
##########
@@ -105,6 +105,16 @@ spec:
           {{- end }}
           {{- if .Values.migrateDatabaseJob.args }}
           args: {{- tpl (toYaml .Values.migrateDatabaseJob.args) . | nindent 
12 }}
+          {{- else }}
+          # Bidirectional metadata DB reconciliation: forward migrate, no-op, 
or
+          # exec downgrade in the still-running api-server pod depending on the
+          # relationship between the chart's target version and the DB's 
current
+          # alembic head. See https://github.com/apache/airflow/issues/68072.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to