This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 977935d14b4 airflow-ctl-tests: wait for Dag run terminal state before
xcom commands (#67065)
977935d14b4 is described below
commit 977935d14b4e8a2d094b7edbc2563b9d56641058
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun May 17 21:51:02 2026 +0200
airflow-ctl-tests: wait for Dag run terminal state before xcom commands
(#67065)
The airflowctl xcom integration tests are flaky on ARM CI: xcom add plus
xcom get and xcom list succeed, then xcom edit and xcom delete fail with
"The XCom with key ... doesn't exist". Same SHA passes on some ARM runs
and fails on others.
Root cause is a race between the test's xcom add and execution of
runme_0. When the task transitions to RUNNING, the execution API
queries every XCom key for that task instance and tells the worker to
clear them via xcom_keys_to_clear (see
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
and task-sdk/src/airflow/sdk/execution_time/task_runner.py). If the
xcom add lands before the task starts running -- which only takes a
few seconds for the bash echo task in example_bash_operator -- the
user-added XCom is wiped right before xcom edit / xcom delete fires.
Wait for the targeted Dag run to reach a terminal state before each
xcom command. Once the task is terminal it won't run again, so the
clear logic doesn't fire and the user-added XCom survives the rest of
the xcom commands.
---
.../tests/airflowctl_tests/conftest.py | 72 +++++++++++++++++++++-
1 file changed, 69 insertions(+), 3 deletions(-)
diff --git a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
index cbcea44ec36..31a3696a161 100644
--- a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
+++ b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
@@ -16,9 +16,13 @@
# under the License.
from __future__ import annotations
+import json
import os
+import re
import subprocess
import sys
+import time
+from subprocess import PIPE, STDOUT, Popen
import pytest
import requests
@@ -35,6 +39,65 @@ from airflowctl_tests.constants import (
from tests_common.test_utils.fernet import generate_fernet_key_string
+# XCom add/edit/delete race against task execution: when the target task
transitions to
+# RUNNING, the execution API tells the worker to clear every XCom key
currently stored
+# for that task instance (see `xcom_keys_to_clear` in
+#
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py).
Any
+# XCom the test just added through airflowctl is wiped, and the next xcom
edit/delete
+# command then fails with "XCom doesn't exist". Waiting for the Dag run to
reach a
+# terminal state means the task has already run (and won't run again), so
user-added
+# XComs survive the rest of the xcom commands.
+_XCOM_TARGET_PATTERN =
re.compile(r'^xcom\s+(?:add|get|list|edit|delete)\s+(\S+)\s+"(manual__[^"]+)"')
+_DAG_RUN_TERMINAL_STATES = frozenset({"success", "failed"})
+
+
+def _airflowctl_dag_run_state(dag_id: str, dag_run_id: str, env_vars: dict,
skip_login: bool) -> str | None:
+ """Return the current state of a Dag run via airflowctl, or None if
unparsable."""
+ host_envs = os.environ.copy()
+ host_envs.update(env_vars)
+
+ get_cmd = f'airflowctl dagrun get {dag_id} "{dag_run_id}" -o json'
+ if not skip_login:
+ get_cmd = f"airflowctl {LOGIN_COMMAND} && {get_cmd}"
+
+ proc = Popen(get_cmd.encode(), stdout=PIPE, stderr=STDOUT, shell=True,
env=host_envs)
+ try:
+ out, _ = proc.communicate(timeout=20)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ return None
+ out_str = out.decode()
+ if LOGIN_OUTPUT in out_str:
+ out_str = out_str.split(f"{LOGIN_OUTPUT}\n", 1)[-1].strip()
+ start, end = out_str.find("{"), out_str.rfind("}")
+ if start == -1 or end == -1:
+ return None
+ try:
+ return json.loads(out_str[start : end + 1]).get("state")
+ except json.JSONDecodeError:
+ return None
+
+
+def _wait_for_dag_run_terminal_state(
+ dag_id: str,
+ dag_run_id: str,
+ env_vars: dict,
+ skip_login: bool,
+ timeout: int = 60,
+) -> None:
+ """Block until the Dag run reaches success/failed, or raise
TimeoutError."""
+ deadline = time.monotonic() + timeout
+ last_state: str | None = None
+ while time.monotonic() < deadline:
+ last_state = _airflowctl_dag_run_state(dag_id, dag_run_id, env_vars,
skip_login)
+ if last_state in _DAG_RUN_TERMINAL_STATES:
+ return
+ time.sleep(1)
+ raise TimeoutError(
+ f"Dag run {dag_id}/{dag_run_id} did not reach terminal state in
{timeout}s "
+ f"(last seen state: {last_state})"
+ )
+
@pytest.fixture(scope="module")
def api_token():
@@ -56,9 +119,6 @@ def run_command():
"""Fixture that provides a helper to run airflowctl commands."""
def _run_command(command: str, env_vars: dict, skip_login: bool = False)
-> str:
- import os
- from subprocess import PIPE, STDOUT, Popen
-
host_envs = os.environ.copy()
host_envs.update(env_vars)
@@ -70,6 +130,12 @@ def run_command():
else:
run_cmd = command_from_config
+ # See `_XCOM_TARGET_PATTERN` above for why xcom commands have to wait
for the
+ # Dag run to be terminal before running.
+ xcom_match = _XCOM_TARGET_PATTERN.match(command)
+ if xcom_match:
+ _wait_for_dag_run_terminal_state(xcom_match.group(1),
xcom_match.group(2), env_vars, skip_login)
+
console.print(f"[yellow]Running command: {command}")
# Give some time for the command to execute and output to be ready