This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 247551da1d0 Add CI duration trend monitor to warn on slow main builds 
(#68368)
247551da1d0 is described below

commit 247551da1d069ff6f2c4d32a5f2dc739f44d71a4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Jun 14 01:39:44 2026 +0200

    Add CI duration trend monitor to warn on slow main builds (#68368)
    
    Adds an automated warning when CI run times on `main` creep above the
    recent trend, so slowdowns are caught early instead of at the timeout
    cliff (as recently happened with the MySQL tests).
    
    A daily scheduled workflow runs a new analysis script that fetches the
    recent `schedule`-event runs of `ci-amd.yml` on `main` (the post-merge
    canaries), computes the wall-clock duration of each run and each job,
    and compares the latest run against a robust median baseline of the
    preceding runs. A regression is only flagged when the increase clears
    both a relative threshold (default +25%) and an absolute floor, so
    short, noisy jobs do not raise spurious alerts. When a regression is
    detected it posts to the internal-airflow-ci-cd Slack channel; otherwise
    it stays quiet.
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 .github/workflows/ci-duration-monitor.yml         |  69 +++
 scripts/ci/analyze_ci_job_durations.py            | 586 ++++++++++++++++++++++
 scripts/tests/ci/test_analyze_ci_job_durations.py | 310 ++++++++++++
 3 files changed, 965 insertions(+)

diff --git a/.github/workflows/ci-duration-monitor.yml 
b/.github/workflows/ci-duration-monitor.yml
new file mode 100644
index 00000000000..bc0f093203a
--- /dev/null
+++ b/.github/workflows/ci-duration-monitor.yml
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "CI Duration Monitor"
+on:  # yamllint disable-line rule:truthy
+  schedule:
+    # A few hours after the daily scheduled CI canaries, so the latest main 
run is in the window.
+    - cron: '0 9 * * *'
+  workflow_dispatch:
+permissions:
+  contents: read
+  actions: read
+env:
+  GITHUB_REPOSITORY: ${{ github.repository }}
+  GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+  SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
+
+jobs:
+
+  monitor-ci-durations:
+    name: "Monitor CI durations on main"
+    runs-on: ubuntu-latest
+    steps:
+      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+        uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd  # 
v6.0.2
+        with:
+          persist-credentials: false
+
+      - name: "Analyze CI job durations"
+        id: analyze
+        run: python3 scripts/ci/analyze_ci_job_durations.py
+        env:
+          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+          # main coverage comes from the scheduled canary runs of the AMD 
workflow.
+          WORKFLOW_NAME: "ci-amd.yml"
+          BRANCH: "main"
+          MAX_RUNS: "25"
+          OUTPUT_FILE: "slack-message.json"
+
+      - name: "Post duration alert to Slack"
+        if: steps.analyze.outputs.has-regression == 'true'
+        uses: 
slackapi/slack-github-action@45a88b9581bfab2566dc881e2cd66d334e621e2c  # v3.0.3
+        with:
+          method: chat.postMessage
+          token: ${{ env.SLACK_BOT_TOKEN }}
+          payload-file-path: "slack-message.json"
+
+      - name: "Upload duration analysis"
+        uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a 
 # v7.0.1
+        with:
+          name: "ci-duration-analysis"
+          path: slack-message.json
+          retention-days: 14
+        if: always() && steps.analyze.outputs.has-regression == 'true'
diff --git a/scripts/ci/analyze_ci_job_durations.py 
b/scripts/ci/analyze_ci_job_durations.py
new file mode 100644
index 00000000000..6cc512f645b
--- /dev/null
+++ b/scripts/ci/analyze_ci_job_durations.py
@@ -0,0 +1,586 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# /// script
+# requires-python = ">=3.10"
+# ///
+"""
+Watch CI job durations on ``main`` and warn when they creep above the recent 
trend.
+
+Motivation: CI run times can drift upwards slowly (new tests, slower runners,
+queue pressure) and nobody notices until a job starts timing out. This script
+fetches the recent completed runs of the main CI workflow, computes the 
wall-clock
+duration of each run and of each individual job, and compares the latest run(s)
+against a robust baseline built from the preceding runs. When the latest 
duration
+is meaningfully above that baseline it emits a warning so regressions are 
caught
+early rather than at the timeout cliff.
+
+The baseline is the *median* of the preceding window (robust to the odd slow 
run),
+and a regression is only flagged when the latest median exceeds the baseline by
+both a relative margin (``REL_THRESHOLD``) and an absolute floor
+(``MIN_ABS_INCREASE_MINUTES`` / ``JOB_MIN_ABS_INCREASE_MINUTES``) so short jobs
+with noisy timings do not trigger spurious alerts.
+
+Environment variables (required):
+  GITHUB_REPOSITORY  - Owner/repo (e.g. apache/airflow)
+  GITHUB_TOKEN       - GitHub token for API access (used by ``gh``)
+
+Environment variables (optional):
+  WORKFLOW_NAME             - Workflow file to query (default: ci-amd.yml)
+  BRANCH                    - Branch to filter runs (default: main)
+  EVENT                     - Trigger event to restrict runs to, e.g. 
"schedule"; the main
+                              post-merge canaries. Empty string = all events 
(default: schedule)
+  MAX_RUNS                  - Window of completed runs to analyze (default: 25)
+  LATEST_RUNS               - Most-recent runs compared against the baseline 
(default: 1)
+  MIN_BASELINE_RUNS         - Minimum baseline runs needed to compute a trend 
(default: 5)
+  REL_THRESHOLD             - Relative increase over baseline to flag, e.g. 
0.25 = 25% (default: 0.25)
+  MIN_ABS_INCREASE_MINUTES  - Absolute floor for the overall-run alert 
(default: 5)
+  JOB_MIN_ABS_INCREASE_MINUTES - Absolute floor for per-job alerts (default: 3)
+  ANALYZE_JOBS              - Whether to fetch per-job durations 
("true"/"false", default: true)
+  ONLY_SUCCESSFUL           - Only consider runs that concluded "success" 
(default: true)
+  SLACK_CHANNEL             - Slack channel for the message payload (default: 
internal-airflow-ci-cd)
+  OUTPUT_FILE               - Path for the Slack message output (default: 
slack-message.json)
+  GITHUB_OUTPUT             - Path to GitHub Actions output file
+  GITHUB_STEP_SUMMARY       - Path to GitHub Actions step summary file
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import subprocess
+import sys
+from datetime import datetime
+from pathlib import Path
+
+ISO_SUFFIX_Z = "Z"
+
+
+def env_float(name: str, default: float) -> float:
+    """Read a float environment variable, falling back to ``default`` when 
unset/invalid."""
+    raw = os.environ.get(name)
+    if raw is None or raw.strip() == "":
+        return default
+    try:
+        return float(raw)
+    except ValueError:
+        print(f"Invalid float for {name}={raw!r}; using default {default}", 
file=sys.stderr)
+        return default
+
+
+def env_int(name: str, default: int) -> int:
+    """Read an int environment variable, falling back to ``default`` when 
unset/invalid."""
+    raw = os.environ.get(name)
+    if raw is None or raw.strip() == "":
+        return default
+    try:
+        return int(raw)
+    except ValueError:
+        print(f"Invalid int for {name}={raw!r}; using default {default}", 
file=sys.stderr)
+        return default
+
+
+def env_bool(name: str, default: bool) -> bool:
+    """Read a boolean environment variable ("true"/"false")."""
+    raw = os.environ.get(name)
+    if raw is None or raw.strip() == "":
+        return default
+    return raw.strip().lower() in {"1", "true", "yes", "on"}
+
+
+def escape_slack_mrkdwn(text: str) -> str:
+    """Escape special characters for Slack mrkdwn format."""
+    text = text.replace("&", "&amp;")
+    text = text.replace("<", "&lt;")
+    text = text.replace(">", "&gt;")
+    return text
+
+
+def gh_api(endpoint: str, **kwargs: str) -> str | None:
+    """Call GitHub API via gh CLI.
+
+    Forces ``--method GET``: ``gh api`` defaults to POST whenever ``-f``
+    parameters are present, which makes read-only endpoints (such as the
+    workflow runs list) return 404.
+    """
+    cmd = ["gh", "api", "--method", "GET", endpoint]
+    for key, value in kwargs.items():
+        cmd.extend(["-f", f"{key}={value}"])
+    result = subprocess.run(cmd, capture_output=True, text=True, check=False)
+    if result.returncode != 0:
+        print(f"gh api error for {endpoint}: {result.stderr}", file=sys.stderr)
+        return None
+    return result.stdout.strip()
+
+
+def parse_iso(timestamp: str | None) -> datetime | None:
+    """Parse an ISO-8601 timestamp (with trailing ``Z``) to an aware 
datetime."""
+    if not timestamp:
+        return None
+    try:
+        # ``datetime.fromisoformat`` only learned to parse the ``Z`` suffix in 
3.11.
+        if timestamp.endswith(ISO_SUFFIX_Z):
+            timestamp = timestamp[:-1] + "+00:00"
+        return datetime.fromisoformat(timestamp)
+    except ValueError:
+        return None
+
+
+def duration_seconds(start: str | None, end: str | None) -> float | None:
+    """Return the number of seconds between two ISO timestamps, or None if 
unparsable."""
+    start_dt = parse_iso(start)
+    end_dt = parse_iso(end)
+    if start_dt is None or end_dt is None:
+        return None
+    seconds = (end_dt - start_dt).total_seconds()
+    if seconds < 0:
+        return None
+    return seconds
+
+
+def median(values: list[float]) -> float:
+    """Return the median of a non-empty list of values."""
+    ordered = sorted(values)
+    n = len(ordered)
+    mid = n // 2
+    if n % 2 == 1:
+        return ordered[mid]
+    return (ordered[mid - 1] + ordered[mid]) / 2
+
+
+def format_duration(seconds: float) -> str:
+    """Format a duration in seconds as e.g. ``29m 41s``."""
+    total = int(round(seconds))
+    minutes, secs = divmod(total, 60)
+    if minutes == 0:
+        return f"{secs}s"
+    return f"{minutes}m {secs:02d}s"
+
+
+# Wall-clock shorter than this almost always means a run that was cancelled,
+# skipped by selective checks, or never really executed the test matrix — not a
+# representative "main build". Such runs would corrupt the duration baseline.
+MIN_VALID_RUN_SECONDS = 120
+
+
+def get_recent_runs(
+    repo: str, workflow: str, branch: str, max_runs: int, only_successful: 
bool, event: str
+) -> list[dict]:
+    """Get recent completed workflow runs (newest first) with timing metadata.
+
+    ``event`` (e.g. ``schedule``) restricts the result to a single trigger 
type so
+    that PR runs targeting ``main`` are not mixed in with the post-merge canary
+    runs. Pass an empty string to include every event.
+    """
+    # Over-fetch when filtering to success so we still end up with ~max_runs 
usable runs.
+    per_page = max_runs * 2 if only_successful else max_runs
+    per_page = min(per_page, 100)
+    params = {
+        "branch": branch,
+        "per_page": str(per_page),
+        "status": "completed",
+    }
+    if event:
+        params["event"] = event
+    output = gh_api(f"repos/{repo}/actions/workflows/{workflow}/runs", 
**params)
+    if not output:
+        return []
+    try:
+        data = json.loads(output)
+    except json.JSONDecodeError:
+        return []
+
+    runs: list[dict] = []
+    for run in data.get("workflow_runs", []):
+        if only_successful and run.get("conclusion") != "success":
+            continue
+        seconds = duration_seconds(run.get("run_started_at"), 
run.get("updated_at"))
+        if seconds is None or seconds < MIN_VALID_RUN_SECONDS:
+            continue
+        runs.append(
+            {
+                "id": run["id"],
+                "run_number": run.get("run_number", "?"),
+                "created_at": run.get("created_at", ""),
+                "conclusion": run.get("conclusion", "unknown"),
+                "event": run.get("event", "unknown"),
+                "html_url": run.get("html_url", ""),
+                "duration": seconds,
+            }
+        )
+        if len(runs) >= max_runs:
+            break
+    return runs
+
+
+def get_run_jobs(repo: str, run_id: int) -> dict[str, float]:
+    """Return a mapping of job name -> duration in seconds for a single run.
+
+    Only jobs that completed successfully are included, so that a job which was
+    cancelled or skipped on a particular run does not pollute its duration 
trend.
+    """
+    result = subprocess.run(
+        ["gh", "run", "view", str(run_id), "--repo", repo, "--json", "jobs"],
+        capture_output=True,
+        text=True,
+        check=False,
+    )
+    if result.returncode != 0:
+        print(f"Could not fetch jobs for run {run_id}: {result.stderr}", 
file=sys.stderr)
+        return {}
+    try:
+        data = json.loads(result.stdout)
+    except json.JSONDecodeError:
+        return {}
+
+    durations: dict[str, float] = {}
+    for job in data.get("jobs", []):
+        if job.get("conclusion") != "success":
+            continue
+        seconds = duration_seconds(job.get("startedAt"), 
job.get("completedAt"))
+        if seconds is None:
+            continue
+        name = job.get("name", "unknown")
+        # A matrix can surface the same job name more than once per run; keep 
the longest.
+        durations[name] = max(durations.get(name, 0.0), seconds)
+    return durations
+
+
+def detect_regression(
+    latest_values: list[float],
+    baseline_values: list[float],
+    rel_threshold: float,
+    min_abs_increase_seconds: float,
+) -> dict | None:
+    """Compare latest durations against a baseline window.
+
+    Returns a dict describing the regression when the latest median is above 
the
+    baseline median by both the relative threshold and the absolute floor, 
else None.
+    """
+    if not latest_values or not baseline_values:
+        return None
+    latest = median(latest_values)
+    baseline = median(baseline_values)
+    increase = latest - baseline
+    rel_increase = increase / baseline if baseline > 0 else 0.0
+    if increase >= min_abs_increase_seconds and rel_increase >= rel_threshold:
+        return {
+            "latest": latest,
+            "baseline": baseline,
+            "increase": increase,
+            "rel_increase": rel_increase,
+        }
+    return None
+
+
+def analyze_jobs(
+    repo: str,
+    latest_runs: list[dict],
+    baseline_runs: list[dict],
+    min_baseline_runs: int,
+    rel_threshold: float,
+    min_abs_increase_seconds: float,
+) -> list[dict]:
+    """Fetch per-job durations and return the jobs whose latest duration 
regressed."""
+    latest_job_durations: dict[str, list[float]] = {}
+    for run in latest_runs:
+        for name, seconds in get_run_jobs(repo, run["id"]).items():
+            latest_job_durations.setdefault(name, []).append(seconds)
+
+    baseline_job_durations: dict[str, list[float]] = {}
+    for run in baseline_runs:
+        for name, seconds in get_run_jobs(repo, run["id"]).items():
+            baseline_job_durations.setdefault(name, []).append(seconds)
+
+    regressions: list[dict] = []
+    for name, latest_values in latest_job_durations.items():
+        baseline_values = baseline_job_durations.get(name, [])
+        if len(baseline_values) < min_baseline_runs:
+            continue
+        regression = detect_regression(
+            latest_values, baseline_values, rel_threshold, 
min_abs_increase_seconds
+        )
+        if regression:
+            regression["job"] = name
+            regressions.append(regression)
+
+    regressions.sort(key=lambda r: r["rel_increase"], reverse=True)
+    return regressions
+
+
+def format_slack_message(
+    repo: str,
+    workflow: str,
+    branch: str,
+    overall_regression: dict | None,
+    job_regressions: list[dict],
+    recent_runs: list[dict],
+    rel_threshold: float,
+    channel: str,
+) -> dict:
+    """Format the regression report as a Slack Block Kit message."""
+    blocks: list[dict] = [
+        {
+            "type": "header",
+            "text": {"type": "plain_text", "text": "⏱️ CI Duration Trend 
Alert"},
+        },
+        {
+            "type": "section",
+            "text": {
+                "type": "mrkdwn",
+                "text": (
+                    f"CI run times on *{escape_slack_mrkdwn(branch)}* "
+                    f"(`{escape_slack_mrkdwn(workflow)}`) have risen above the 
recent trend "
+                    f"(baseline = median of the preceding runs; threshold = "
+                    f"+{int(rel_threshold * 100)}%)."
+                ),
+            },
+        },
+        {"type": "divider"},
+    ]
+
+    if overall_regression:
+        blocks.append(
+            {
+                "type": "section",
+                "text": {
+                    "type": "mrkdwn",
+                    "text": (
+                        "*Overall run wall-clock regressed:*\n"
+                        f"• Latest: 
*{format_duration(overall_regression['latest'])}*\n"
+                        f"• Baseline: 
{format_duration(overall_regression['baseline'])}\n"
+                        f"• Increase: 
*+{format_duration(overall_regression['increase'])}* "
+                        f"(+{round(overall_regression['rel_increase'] * 100, 
1)}%)"
+                    ),
+                },
+            }
+        )
+
+    if job_regressions:
+        lines = ["*Jobs that got slower:*"]
+        for reg in job_regressions[:15]:
+            lines.append(
+                f"• *{escape_slack_mrkdwn(reg['job'])}* — "
+                f"{format_duration(reg['baseline'])} → 
*{format_duration(reg['latest'])}* "
+                f"(+{round(reg['rel_increase'] * 100, 1)}%)"
+            )
+        text = "\n".join(lines)
+        if len(text) > 2900:
+            text = text[:2900] + "\n_...truncated_"
+        blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": 
text}})
+        if len(job_regressions) > 15:
+            blocks.append(
+                {
+                    "type": "context",
+                    "elements": [
+                        {
+                            "type": "mrkdwn",
+                            "text": f"_...and {len(job_regressions) - 15} more 
slower jobs_",
+                        }
+                    ],
+                }
+            )
+
+    if recent_runs:
+        latest_run = recent_runs[0]
+        blocks.append({"type": "divider"})
+        blocks.append(
+            {
+                "type": "context",
+                "elements": [
+                    {
+                        "type": "mrkdwn",
+                        "text": (
+                            
f"<https://github.com/{repo}/actions/workflows/{workflow}";
+                            f"?query=branch%3A{branch}|View 
{escape_slack_mrkdwn(workflow)} runs> | "
+                            f"Latest: <{latest_run['html_url']}|Run 
#{latest_run['run_number']}> "
+                            f"({format_duration(latest_run['duration'])})"
+                        ),
+                    }
+                ],
+            }
+        )
+
+    fallback_parts = []
+    if overall_regression:
+        fallback_parts.append(f"overall 
+{round(overall_regression['rel_increase'] * 100, 1)}%")
+    if job_regressions:
+        fallback_parts.append(f"{len(job_regressions)} slower job(s)")
+    fallback = f"CI Duration Trend Alert on {branch}: " + ", 
".join(fallback_parts)
+
+    return {
+        "channel": channel,
+        "text": fallback,
+        "blocks": blocks,
+    }
+
+
+def write_step_summary(
+    workflow: str,
+    branch: str,
+    overall_regression: dict | None,
+    job_regressions: list[dict],
+    recent_runs: list[dict],
+    baseline_count: int,
+) -> None:
+    """Write a GitHub Actions step summary in markdown."""
+    summary_path = os.environ.get("GITHUB_STEP_SUMMARY")
+    if not summary_path:
+        return
+
+    lines = [
+        "## ⏱️ CI Duration Trend",
+        "",
+        f"Workflow `{workflow}` on `{branch}` — baseline from {baseline_count} 
preceding runs.",
+        "",
+    ]
+
+    if overall_regression:
+        lines += [
+            "### ⚠️ Overall run regressed",
+            "",
+            f"- Latest: **{format_duration(overall_regression['latest'])}**",
+            f"- Baseline: {format_duration(overall_regression['baseline'])}",
+            f"- Increase: 
**+{format_duration(overall_regression['increase'])}** "
+            f"(+{round(overall_regression['rel_increase'] * 100, 1)}%)",
+            "",
+        ]
+    else:
+        lines += ["### ✅ Overall run within trend", ""]
+
+    if job_regressions:
+        lines += [
+            "### ⚠️ Slower jobs",
+            "",
+            "| Job | Baseline | Latest | Increase |",
+            "|-----|----------|--------|----------|",
+        ]
+        for reg in job_regressions[:25]:
+            lines.append(
+                f"| {reg['job']} | {format_duration(reg['baseline'])} | "
+                f"{format_duration(reg['latest'])} | 
+{round(reg['rel_increase'] * 100, 1)}% |"
+            )
+        lines.append("")
+    else:
+        lines += ["### ✅ No individual job regressed", ""]
+
+    if recent_runs:
+        lines += [
+            "### Recent run durations",
+            "",
+            "| Run | Event | Duration |",
+            "|-----|-------|----------|",
+        ]
+        for run in recent_runs[:15]:
+            lines.append(
+                f"| [#{run['run_number']}]({run['html_url']}) | {run['event']} 
| "
+                f"{format_duration(run['duration'])} |"
+            )
+        lines.append("")
+
+    with open(summary_path, "a") as f:
+        f.write("\n".join(lines))
+
+
+def main() -> None:
+    repo = os.environ.get("GITHUB_REPOSITORY", "apache/airflow")
+    workflow = os.environ.get("WORKFLOW_NAME", "ci-amd.yml")
+    branch = os.environ.get("BRANCH", "main")
+    event = os.environ.get("EVENT", "schedule")
+    max_runs = env_int("MAX_RUNS", 25)
+    latest_runs_count = env_int("LATEST_RUNS", 1)
+    min_baseline_runs = env_int("MIN_BASELINE_RUNS", 5)
+    rel_threshold = env_float("REL_THRESHOLD", 0.25)
+    min_abs_increase_seconds = env_float("MIN_ABS_INCREASE_MINUTES", 5.0) * 60
+    job_min_abs_increase_seconds = env_float("JOB_MIN_ABS_INCREASE_MINUTES", 
3.0) * 60
+    do_analyze_jobs = env_bool("ANALYZE_JOBS", True)
+    only_successful = env_bool("ONLY_SUCCESSFUL", True)
+    channel = os.environ.get("SLACK_CHANNEL", "internal-airflow-ci-cd")
+    output_file = Path(os.environ.get("OUTPUT_FILE", "slack-message.json"))
+
+    event_label = event or "all events"
+    print(f"Analyzing CI durations for {repo} ({workflow} on {branch}, 
event={event_label})")
+    print(f"Window: up to {max_runs} completed runs; latest 
{latest_runs_count} vs baseline.")
+
+    runs = get_recent_runs(repo, workflow, branch, max_runs, only_successful, 
event)
+    if len(runs) < latest_runs_count + min_baseline_runs:
+        print(
+            f"Not enough runs to establish a trend "
+            f"(found {len(runs)}, need {latest_runs_count + 
min_baseline_runs}). Skipping."
+        )
+        _write_outputs(False, False, 0)
+        sys.exit(0)
+
+    latest_runs = runs[:latest_runs_count]
+    baseline_runs = runs[latest_runs_count:]
+    print(f"Latest runs: {len(latest_runs)}; baseline runs: 
{len(baseline_runs)}.")
+
+    overall_regression = detect_regression(
+        [r["duration"] for r in latest_runs],
+        [r["duration"] for r in baseline_runs],
+        rel_threshold,
+        min_abs_increase_seconds,
+    )
+    if overall_regression:
+        print(
+            f"Overall regression: 
{format_duration(overall_regression['baseline'])} -> "
+            f"{format_duration(overall_regression['latest'])} "
+            f"(+{round(overall_regression['rel_increase'] * 100, 1)}%)"
+        )
+    else:
+        print("Overall run duration is within the recent trend.")
+
+    job_regressions: list[dict] = []
+    if do_analyze_jobs:
+        job_regressions = analyze_jobs(
+            repo,
+            latest_runs,
+            baseline_runs,
+            min_baseline_runs,
+            rel_threshold,
+            job_min_abs_increase_seconds,
+        )
+        print(f"Jobs that regressed: {len(job_regressions)}")
+
+    has_regression = bool(overall_regression) or bool(job_regressions)
+
+    if has_regression:
+        slack_message = format_slack_message(
+            repo, workflow, branch, overall_regression, job_regressions, runs, 
rel_threshold, channel
+        )
+        output_file.write_text(json.dumps(slack_message, indent=2))
+        print(f"Slack message written to: {output_file}")
+    else:
+        print("No regression detected; no Slack message written.")
+
+    write_step_summary(workflow, branch, overall_regression, job_regressions, 
runs, len(baseline_runs))
+    _write_outputs(has_regression, bool(overall_regression), 
len(job_regressions))
+
+
+def _write_outputs(has_regression: bool, overall_regression: bool, 
regressed_jobs: int) -> None:
+    """Write GitHub Actions outputs used to gate the Slack-notify step."""
+    github_output = os.environ.get("GITHUB_OUTPUT")
+    if not github_output:
+        return
+    with open(github_output, "a") as f:
+        f.write(f"has-regression={str(has_regression).lower()}\n")
+        f.write(f"overall-regression={str(overall_regression).lower()}\n")
+        f.write(f"regressed-jobs={regressed_jobs}\n")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/scripts/tests/ci/test_analyze_ci_job_durations.py 
b/scripts/tests/ci/test_analyze_ci_job_durations.py
new file mode 100644
index 00000000000..432fefe44a5
--- /dev/null
+++ b/scripts/tests/ci/test_analyze_ci_job_durations.py
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib.util
+import json
+import subprocess
+import sys
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+MODULE_PATH = Path(__file__).resolve().parents[3] / "scripts" / "ci" / 
"analyze_ci_job_durations.py"
+
+
[email protected]
+def durations_module():
+    module_name = "test_analyze_ci_job_durations_module"
+    sys.modules.pop(module_name, None)
+    spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH)
+    assert spec is not None
+    assert spec.loader is not None
+    module = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(module)
+    return module
+
+
+class TestGhApi:
+    def test_forces_get_method(self, durations_module):
+        """`gh api` defaults to POST when -f is passed; we must force GET to 
avoid 404."""
+        completed = subprocess.CompletedProcess(args=[], returncode=0, 
stdout="{}", stderr="")
+        with patch.object(subprocess, "run", return_value=completed) as 
mock_run:
+            
durations_module.gh_api("repos/apache/airflow/actions/workflows/x/runs", 
branch="main")
+        args = mock_run.call_args[0][0]
+        assert "--method" in args
+        assert args[args.index("--method") + 1] == "GET"
+
+
+class TestParseIso:
+    def test_parses_z_suffix(self, durations_module):
+        dt = durations_module.parse_iso("2026-06-10T13:01:53Z")
+        assert dt is not None
+        assert dt.year == 2026 and dt.hour == 13
+
+    def test_parses_offset(self, durations_module):
+        dt = durations_module.parse_iso("2026-06-10T13:01:53+00:00")
+        assert dt is not None
+
+    def test_none_for_empty(self, durations_module):
+        assert durations_module.parse_iso("") is None
+        assert durations_module.parse_iso(None) is None
+
+    def test_none_for_garbage(self, durations_module):
+        assert durations_module.parse_iso("not-a-date") is None
+
+
+class TestDurationSeconds:
+    def test_computes_positive_duration(self, durations_module):
+        seconds = durations_module.duration_seconds("2026-06-10T13:00:00Z", 
"2026-06-10T13:29:00Z")
+        assert seconds == 29 * 60
+
+    def test_none_when_unparsable(self, durations_module):
+        assert durations_module.duration_seconds("bad", 
"2026-06-10T13:29:00Z") is None
+
+    def test_none_when_negative(self, durations_module):
+        """A clock skew / out-of-order pair must not produce a negative 
duration."""
+        assert durations_module.duration_seconds("2026-06-10T13:29:00Z", 
"2026-06-10T13:00:00Z") is None
+
+
+class TestMedian:
+    def test_odd(self, durations_module):
+        assert durations_module.median([3, 1, 2]) == 2
+
+    def test_even(self, durations_module):
+        assert durations_module.median([1, 2, 3, 4]) == 2.5
+
+
+class TestFormatDuration:
+    def test_minutes_and_seconds(self, durations_module):
+        assert durations_module.format_duration(29 * 60 + 41) == "29m 41s"
+
+    def test_seconds_only(self, durations_module):
+        assert durations_module.format_duration(45) == "45s"
+
+    def test_zero_pads_seconds(self, durations_module):
+        assert durations_module.format_duration(60 + 5) == "1m 05s"
+
+
+class TestDetectRegression:
+    def test_flags_regression_above_both_thresholds(self, durations_module):
+        # baseline median ~1800s (30m), latest 2700s (45m) -> +50%, +15m
+        regression = durations_module.detect_regression(
+            latest_values=[2700],
+            baseline_values=[1800, 1810, 1790, 1805, 1795],
+            rel_threshold=0.25,
+            min_abs_increase_seconds=300,
+        )
+        assert regression is not None
+        assert regression["latest"] == 2700
+        assert round(regression["rel_increase"], 2) == 0.5
+
+    def test_no_regression_below_relative_threshold(self, durations_module):
+        # +5% only — under the 25% relative threshold even though absolute is 
large
+        regression = durations_module.detect_regression(
+            latest_values=[6300],
+            baseline_values=[6000, 6000, 6000, 6000, 6000],
+            rel_threshold=0.25,
+            min_abs_increase_seconds=300,
+        )
+        assert regression is None
+
+    def test_no_regression_below_absolute_floor(self, durations_module):
+        # +50% relative but only +60s absolute — under the 300s floor (noisy 
short job)
+        regression = durations_module.detect_regression(
+            latest_values=[180],
+            baseline_values=[120, 120, 120, 120, 120],
+            rel_threshold=0.25,
+            min_abs_increase_seconds=300,
+        )
+        assert regression is None
+
+    def test_robust_to_single_baseline_outlier(self, durations_module):
+        # One slow baseline run should not move the median enough to mask a 
real regression.
+        regression = durations_module.detect_regression(
+            latest_values=[2700],
+            baseline_values=[1800, 1800, 1800, 1800, 5000],
+            rel_threshold=0.25,
+            min_abs_increase_seconds=300,
+        )
+        assert regression is not None
+
+    def test_empty_inputs(self, durations_module):
+        assert durations_module.detect_regression([], [1, 2], 0.25, 300) is 
None
+        assert durations_module.detect_regression([1], [], 0.25, 300) is None
+
+
+class TestGetRecentRuns:
+    def _runs_payload(self):
+        return json.dumps(
+            {
+                "workflow_runs": [
+                    {
+                        "id": 2,
+                        "run_number": 102,
+                        "conclusion": "success",
+                        "event": "schedule",
+                        "html_url": "https://example/2";,
+                        "run_started_at": "2026-06-10T13:00:00Z",
+                        "updated_at": "2026-06-10T13:45:00Z",
+                    },
+                    {
+                        "id": 1,
+                        "run_number": 101,
+                        "conclusion": "failure",
+                        "event": "schedule",
+                        "html_url": "https://example/1";,
+                        "run_started_at": "2026-06-09T13:00:00Z",
+                        "updated_at": "2026-06-09T13:30:00Z",
+                    },
+                    {
+                        # A cancelled/skipped run with a near-zero wall-clock 
must be dropped
+                        # so it cannot drag the baseline down.
+                        "id": 3,
+                        "run_number": 103,
+                        "conclusion": "success",
+                        "event": "schedule",
+                        "html_url": "https://example/3";,
+                        "run_started_at": "2026-06-10T14:00:00Z",
+                        "updated_at": "2026-06-10T14:00:30Z",
+                    },
+                ]
+            }
+        )
+
+    def test_filters_to_successful_and_computes_duration(self, 
durations_module):
+        with patch.object(durations_module, "gh_api", 
return_value=self._runs_payload()):
+            runs = durations_module.get_recent_runs(
+                "apache/airflow", "ci-amd.yml", "main", max_runs=25, 
only_successful=True, event="schedule"
+            )
+        # Only the 45-minute successful run survives: the failure and the 30s 
run are dropped.
+        assert len(runs) == 1
+        assert runs[0]["id"] == 2
+        assert runs[0]["duration"] == 45 * 60
+
+    def test_includes_failures_when_not_filtering(self, durations_module):
+        with patch.object(durations_module, "gh_api", 
return_value=self._runs_payload()):
+            runs = durations_module.get_recent_runs(
+                "apache/airflow", "ci-amd.yml", "main", max_runs=25, 
only_successful=False, event="schedule"
+            )
+        # The failure is kept, but the 30s run is still dropped as 
non-representative.
+        assert len(runs) == 2
+        assert {r["id"] for r in runs} == {1, 2}
+
+    def test_passes_event_to_api(self, durations_module):
+        with patch.object(durations_module, "gh_api", 
return_value=self._runs_payload()) as mock_api:
+            durations_module.get_recent_runs(
+                "apache/airflow", "ci-amd.yml", "main", max_runs=25, 
only_successful=True, event="schedule"
+            )
+        assert mock_api.call_args.kwargs.get("event") == "schedule"
+
+    def test_omits_event_when_empty(self, durations_module):
+        with patch.object(durations_module, "gh_api", 
return_value=self._runs_payload()) as mock_api:
+            durations_module.get_recent_runs(
+                "apache/airflow", "ci-amd.yml", "main", max_runs=25, 
only_successful=True, event=""
+            )
+        assert "event" not in mock_api.call_args.kwargs
+
+    def test_empty_on_api_failure(self, durations_module):
+        with patch.object(durations_module, "gh_api", return_value=None):
+            runs = durations_module.get_recent_runs(
+                "apache/airflow", "ci-amd.yml", "main", max_runs=25, 
only_successful=True, event="schedule"
+            )
+        assert runs == []
+
+
+class TestGetRunJobs:
+    def test_parses_successful_jobs(self, durations_module):
+        payload = json.dumps(
+            {
+                "jobs": [
+                    {
+                        "name": "Tests",
+                        "conclusion": "success",
+                        "startedAt": "2026-06-10T13:00:00Z",
+                        "completedAt": "2026-06-10T13:20:00Z",
+                    },
+                    {
+                        "name": "Skipped job",
+                        "conclusion": "skipped",
+                        "startedAt": "2026-06-10T13:00:00Z",
+                        "completedAt": "2026-06-10T13:00:00Z",
+                    },
+                ]
+            }
+        )
+        completed = subprocess.CompletedProcess(args=[], returncode=0, 
stdout=payload, stderr="")
+        with patch.object(subprocess, "run", return_value=completed):
+            jobs = durations_module.get_run_jobs("apache/airflow", 2)
+        assert jobs == {"Tests": 20 * 60}
+
+    def test_empty_on_command_failure(self, durations_module):
+        completed = subprocess.CompletedProcess(args=[], returncode=1, 
stdout="", stderr="boom")
+        with patch.object(subprocess, "run", return_value=completed):
+            assert durations_module.get_run_jobs("apache/airflow", 2) == {}
+
+
+class TestAnalyzeJobs:
+    def test_reports_only_regressed_jobs_with_enough_baseline(self, 
durations_module):
+        latest_runs = [{"id": 100}]
+        baseline_runs = [{"id": i} for i in range(5)]
+
+        def fake_jobs(_repo, run_id):
+            if run_id == 100:
+                return {"slow-job": 2700, "stable-job": 600, "new-job": 999}
+            # baseline runs
+            return {"slow-job": 1800, "stable-job": 590}
+
+        with patch.object(durations_module, "get_run_jobs", 
side_effect=fake_jobs):
+            regressions = durations_module.analyze_jobs(
+                "apache/airflow",
+                latest_runs,
+                baseline_runs,
+                min_baseline_runs=5,
+                rel_threshold=0.25,
+                min_abs_increase_seconds=180,
+            )
+        names = [r["job"] for r in regressions]
+        # slow-job regressed; stable-job did not; new-job lacks baseline 
samples
+        assert names == ["slow-job"]
+
+
+class TestFormatSlackMessage:
+    def test_includes_channel_and_blocks(self, durations_module):
+        msg = durations_module.format_slack_message(
+            repo="apache/airflow",
+            workflow="ci-amd.yml",
+            branch="main",
+            overall_regression={
+                "latest": 2700,
+                "baseline": 1800,
+                "increase": 900,
+                "rel_increase": 0.5,
+            },
+            job_regressions=[
+                {"job": "Tests", "latest": 1500, "baseline": 1000, "increase": 
500, "rel_increase": 0.5}
+            ],
+            recent_runs=[{"run_number": 102, "html_url": "https://example/2";, 
"duration": 2700}],
+            rel_threshold=0.25,
+            channel="internal-airflow-ci-cd",
+        )
+        assert msg["channel"] == "internal-airflow-ci-cd"
+        assert any(b["type"] == "header" for b in msg["blocks"])
+        text_blob = json.dumps(msg)
+        assert "Tests" in text_blob
+        assert "main" in msg["text"]


Reply via email to