This is an automated email from the ASF dual-hosted git repository.

asorokoumov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/otava.git


The following commit(s) were added to refs/heads/master by this push:
     new a68bc98  Add e2e Graphite test (#106)
a68bc98 is described below

commit a68bc98cfafd32432b52f6125a4f1b311d2c6601
Author: Alex Sorokoumov <[email protected]>
AuthorDate: Sat Dec 13 10:31:16 2025 -0800

    Add e2e Graphite test (#106)
    
    * Add e2e Graphite test
    
    * Extract common functions into e2e_test_utils.py
---
 tests/csv_e2e_test.py      |   5 +-
 tests/e2e_test_utils.py    | 177 ++++++++++++++++++++++++++++++++++++++
 tests/graphite_e2e_test.py | 210 +++++++++++++++++++++++++++++++++++++++++++++
 tests/postgres_e2e_test.py | 171 +++++++++++++-----------------------
 4 files changed, 449 insertions(+), 114 deletions(-)

diff --git a/tests/csv_e2e_test.py b/tests/csv_e2e_test.py
index d3b5624..33d80f4 100644
--- a/tests/csv_e2e_test.py
+++ b/tests/csv_e2e_test.py
@@ -24,6 +24,7 @@ from datetime import datetime, timedelta, timezone
 from pathlib import Path
 
 import pytest
+from e2e_test_utils import _remove_trailing_whitespaces
 
 
 def test_analyze_csv():
@@ -206,7 +207,3 @@ def test_regressions_csv():
             )
 
         assert _remove_trailing_whitespaces(proc.stdout) == 
expected_output.rstrip("\n")
-
-
-def _remove_trailing_whitespaces(s: str) -> str:
-    return "\n".join(line.rstrip() for line in s.splitlines()).strip()
diff --git a/tests/e2e_test_utils.py b/tests/e2e_test_utils.py
new file mode 100644
index 0000000..439dcac
--- /dev/null
+++ b/tests/e2e_test_utils.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import shutil
+import socket
+import subprocess
+import time
+from contextlib import contextmanager
+from typing import Callable
+
+import pytest
+
+
+@contextmanager
+def container(
+    image: str,
+    *,
+    env: dict[str, str] | None = None,
+    ports: list[int] | None = None,
+    volumes: dict[str, str] | None = None,
+    readiness_check: Callable[[str, dict[int, int]], bool] | None = None,
+):
+    """
+    Generic context manager for running a Docker container.
+
+    Args:
+        image: Docker image to run (e.g., "postgres:latest").
+        env: Optional dict of environment variables to set in the container.
+        ports: Optional list of container ports to publish (will be mapped to 
random host ports).
+        volumes: Optional dict mapping host paths to container paths for 
volume mounts.
+        readiness_check: Optional callable that takes (container_id, port_map) 
and returns True
+                         when the container is ready. port_map maps container 
ports to host ports.
+                         If not provided, the container is considered ready 
once all ports accept
+                         TCP connections.
+
+    Yields:
+        A tuple of (container_id, port_map) where port_map is a dict mapping 
container ports
+        to their assigned host ports.
+    """
+    if not shutil.which("docker"):
+        pytest.fail("docker is not available on PATH")
+
+    container_id = None
+    try:
+        # Build docker run command
+        cmd = ["docker", "run", "-d"]
+
+        # Add environment variables
+        if env:
+            for key, value in env.items():
+                cmd.extend(["--env", f"{key}={value}"])
+
+        # Add volume mounts
+        if volumes:
+            for host_path, container_path in volumes.items():
+                cmd.extend(["--volume", f"{host_path}:{container_path}"])
+
+        # Add port mappings
+        if ports:
+            for port in ports:
+                cmd.extend(["--publish", str(port)])
+
+        cmd.append(image)
+
+        # Start the container
+        proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
+        if proc.returncode != 0:
+            pytest.fail(
+                "Docker command returned non-zero exit code.\n\n"
+                f"Command: {cmd!r}\n"
+                f"Exit code: {proc.returncode}\n\n"
+                f"Stdout:\n{proc.stdout}\n\n"
+                f"Stderr:\n{proc.stderr}\n"
+            )
+        container_id = proc.stdout.strip()
+
+        # Get assigned host ports for each container port
+        port_map: dict[int, int] = {}
+        if ports:
+            for port in ports:
+                inspect_cmd = [
+                    "docker",
+                    "inspect",
+                    "-f",
+                    f'{{{{ (index (index .NetworkSettings.Ports "{port}/tcp") 
0).HostPort }}}}',
+                    container_id,
+                ]
+                inspect_proc = subprocess.run(
+                    inspect_cmd, capture_output=True, text=True, timeout=60
+                )
+                if inspect_proc.returncode != 0:
+                    pytest.fail(
+                        "Docker inspect returned non-zero exit code.\n\n"
+                        f"Command: {inspect_cmd!r}\n"
+                        f"Exit code: {inspect_proc.returncode}\n\n"
+                        f"Stdout:\n{inspect_proc.stdout}\n\n"
+                        f"Stderr:\n{inspect_proc.stderr}\n"
+                    )
+                port_map[port] = int(inspect_proc.stdout.strip())
+
+        # Wait for readiness
+        deadline = time.time() + 60
+        ready = False
+        while time.time() < deadline:
+            # First check that all ports accept TCP connections
+            all_ports_ready = True
+            for host_port in port_map.values():
+                try:
+                    with socket.create_connection(("localhost", host_port), 
timeout=1):
+                        pass
+                except OSError:
+                    all_ports_ready = False
+                    break
+
+            if not all_ports_ready:
+                time.sleep(1)
+                continue
+
+            # If a custom readiness check is provided, use it
+            if readiness_check is not None:
+                if readiness_check(container_id, port_map):
+                    ready = True
+                    break
+                time.sleep(1)
+            else:
+                # No custom check, ports being open is sufficient
+                ready = True
+                break
+
+        if not ready:
+            pytest.fail("Container did not become ready within timeout.")
+
+        yield container_id, port_map
+    finally:
+        if container_id:
+            res = subprocess.run(
+                ["docker", "stop", container_id], capture_output=True, 
text=True, timeout=60
+            )
+            if res.returncode != 0:
+                pytest.fail(
+                    f"Docker stop returned non-zero exit code: 
{res.returncode}\n"
+                    f"Stdout: {res.stdout}\nStderr: {res.stderr}"
+                )
+            subprocess.run(
+                ["docker", "rm", container_id], capture_output=True, 
text=True, timeout=60
+            )
+
+
+@contextmanager
+def graphite_container():
+    """
+    Context manager for running a Graphite container with seeded data.
+    Yields the Graphite HTTP port and ensures cleanup on exit.
+    """
+    with container(
+        "graphiteapp/graphite-statsd",
+        ports=[80, 2003],
+    ) as (container_id, port_map):
+        yield str(port_map[80])
+
+
+def _remove_trailing_whitespaces(s: str) -> str:
+    return "\n".join(line.rstrip() for line in s.splitlines())
diff --git a/tests/graphite_e2e_test.py b/tests/graphite_e2e_test.py
new file mode 100644
index 0000000..96a2d99
--- /dev/null
+++ b/tests/graphite_e2e_test.py
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import os
+import socket
+import subprocess
+import time
+import urllib.request
+from pathlib import Path
+
+import pytest
+from e2e_test_utils import _remove_trailing_whitespaces, container
+
+CARBON_PORT = 2003
+HTTP_PORT = 80
+
+
+def test_analyze_graphite():
+    """
+    End-to-end test for the Graphite example from docs/GRAPHITE.md.
+
+    Starts Graphite docker container, writes sample data, then runs otava 
analyze,
+    and verifies the output contains expected change points.
+    """
+    with container(
+        "graphiteapp/graphite-statsd",
+        ports=[HTTP_PORT, CARBON_PORT],
+        readiness_check=_graphite_readiness_check,
+    ) as (container_id, port_map):
+        # Seed data into Graphite using the same pattern as datagen.sh
+        data_points = _seed_graphite_data(port_map[CARBON_PORT])
+
+        # Wait for data to be written and available
+        _wait_for_graphite_data(
+            http_port=port_map[HTTP_PORT],
+            metric_path="performance-tests.daily.my-product.client.throughput",
+            expected_points=data_points,
+        )
+
+        # Run the Otava analysis
+        proc = subprocess.run(
+            ["uv", "run", "otava", "analyze", "my-product.test", 
"--since=-10m"],
+            capture_output=True,
+            text=True,
+            timeout=600,
+            env=dict(
+                os.environ,
+                OTAVA_CONFIG=str(Path("examples/graphite/config/otava.yaml")),
+                GRAPHITE_ADDRESS=f"http://localhost:{port_map[HTTP_PORT]}/";,
+                GRAFANA_ADDRESS="http://localhost:3000/";,
+                GRAFANA_USER="admin",
+                GRAFANA_PASSWORD="admin",
+            ),
+        )
+
+        if proc.returncode != 0:
+            pytest.fail(
+                "Command returned non-zero exit code.\n\n"
+                f"Command: {proc.args!r}\n"
+                f"Exit code: {proc.returncode}\n\n"
+                f"Stdout:\n{proc.stdout}\n\n"
+                f"Stderr:\n{proc.stderr}\n"
+            )
+
+        # Verify output contains expected columns and change point indicators
+        output = _remove_trailing_whitespaces(proc.stdout)
+
+        # Check that the header contains expected column names
+        assert "throughput" in output
+        assert "response_time" in output
+        assert "cpu_usage" in output
+
+        # Data shows throughput dropped from ~61k to ~57k (-5.6%) and cpu 
increased from 0.2 to 0.8 (+300%)
+        assert "-5.6%" in output  # throughput change
+        assert "+300.0%" in output  # cpu_usage change
+
+
+def _graphite_readiness_check(container_id: str, port_map: dict[int, int]) -> 
bool:
+    """
+    Check if Graphite is fully ready by writing a canary metric and verifying 
it's queryable.
+
+    This ensures both Carbon (write path) and Graphite-web (read path) are 
operational.
+    """
+    carbon_port = port_map[CARBON_PORT]
+    http_port = port_map[HTTP_PORT]
+
+    # Send a canary metric to Carbon
+    timestamp = int(time.time())
+    canary_metrics = "test.canary.readiness"
+    message = f"{canary_metrics} 1 {timestamp}\n"
+    try:
+        with socket.create_connection(("localhost", carbon_port), timeout=5) 
as sock:
+            sock.sendall(message.encode("utf-8"))
+    except OSError:
+        return False
+
+    # Check if the canary metric is queryable via Graphite-web
+    url = 
f"http://localhost:{http_port}/render?target={canary_metrics}&format=json&from=-1min";
+    try:
+        with urllib.request.urlopen(url, timeout=5) as response:
+            data = json.loads(response.read().decode("utf-8"))
+            if data and len(data) > 0:
+                datapoints = data[0].get("datapoints", [])
+                # Check if we have at least one non-null data point
+                if any(dp[0] is not None for dp in datapoints):
+                    return True
+    except (urllib.error.URLError, json.JSONDecodeError, OSError):
+        pass
+
+    return False
+
+
+def _seed_graphite_data(carbon_port: int) -> int:
+    """
+    Seed Graphite with test data matching the pattern from 
examples/graphite/datagen/datagen.sh.
+
+    Data pattern (from newest to oldest, matching datagen.sh array order):
+    - throughput: 56950, 57980, 57123, 60960, 60160, 61160 (index 0 is newest)
+    - response_time (p50): 85, 87, 88, 89, 85, 87
+    - cpu_usage: 0.7, 0.9, 0.8, 0.1, 0.3, 0.2
+
+    When displayed chronologically (oldest to newest), this shows:
+    - throughput dropped from ~61k to ~57k (-5.6% regression)
+    - cpu increased from 0.2 to 0.8 (+300% regression)
+    """
+    throughput_path = "performance-tests.daily.my-product.client.throughput"
+    throughput_values = [56950, 57980, 57123, 60960, 60160, 61160]
+
+    p50_path = "performance-tests.daily.my-product.client.p50"
+    p50_values = [85, 87, 88, 89, 85, 87]
+
+    cpu_path = "performance-tests.daily.my-product.server.cpu"
+    cpu_values = [0.7, 0.9, 0.8, 0.1, 0.3, 0.2]
+
+    start_timestamp = int(time.time())
+    num_points = len(throughput_values)
+
+    for i in range(num_points):
+        # Data is sent from newest to oldest (same as datagen.sh)
+        timestamp = start_timestamp - (i * 60)
+        _send_to_graphite(carbon_port, throughput_path, throughput_values[i], 
timestamp)
+        _send_to_graphite(carbon_port, p50_path, p50_values[i], timestamp)
+        _send_to_graphite(carbon_port, cpu_path, cpu_values[i], timestamp)
+    return num_points
+
+
+def _send_to_graphite(carbon_port: int, path: str, value: float, timestamp: 
int):
+    """
+    Send a single metric to Graphite via the Carbon plaintext protocol.
+    """
+    message = f"{path} {value} {timestamp}\n"
+    try:
+        with socket.create_connection(("localhost", carbon_port), timeout=5) 
as sock:
+            sock.sendall(message.encode("utf-8"))
+    except OSError as e:
+        pytest.fail(f"Failed to send metric to Graphite: {e}")
+
+
+def _wait_for_graphite_data(
+    http_port: int,
+    metric_path: str,
+    expected_points: int,
+    timeout: float = 120,
+    poll_interval: float = 0.5,
+) -> None:
+    """
+    Wait for Graphite to have the expected data points available.
+
+    Polls the Graphite render API until the specified metric has at least
+    the expected number of non-null data points, or until the timeout expires.
+    """
+    url = 
f"http://localhost:{http_port}/render?target={metric_path}&format=json&from=-10min";
+    deadline = time.time() + timeout
+
+    last_observed_count = 0
+    while time.time() < deadline:
+        try:
+            with urllib.request.urlopen(url, timeout=5) as response:
+                data = json.loads(response.read().decode("utf-8"))
+                if data and len(data) > 0:
+                    datapoints = data[0].get("datapoints", [])
+                    # Count non-null values
+                    non_null_count = sum(1 for dp in datapoints if dp[0] is 
not None)
+                    last_observed_count = non_null_count
+                    if non_null_count >= expected_points:
+                        return
+        except (urllib.error.URLError, json.JSONDecodeError, OSError):
+            pass  # Retry on connection errors
+
+        time.sleep(poll_interval)
+
+    pytest.fail(
+        f"Timeout waiting for Graphite data. "
+        f"Expected {expected_points} points for metric '{metric_path}' within 
{timeout}s, got {last_observed_count}"
+    )
diff --git a/tests/postgres_e2e_test.py b/tests/postgres_e2e_test.py
index e14de8d..e45fa39 100644
--- a/tests/postgres_e2e_test.py
+++ b/tests/postgres_e2e_test.py
@@ -16,15 +16,14 @@
 # under the License.
 
 import os
-import shutil
-import socket
 import subprocess
 import textwrap
-import time
 from contextlib import contextmanager
 from pathlib import Path
+from typing import Callable
 
 import pytest
+from e2e_test_utils import _remove_trailing_whitespaces, container
 
 
 def test_analyze():
@@ -36,7 +35,10 @@ def test_analyze():
     container, and compares stdout to the expected output (seeded data uses
     deterministic 2025 timestamps).
     """
-    with postgres_container() as (postgres_container_id, host_port):
+    username = "exampleuser"
+    password = "examplepassword"
+    db = "benchmark_results"
+    with postgres_container(username, password, db) as (postgres_container_id, 
host_port):
         # Run the Otava analysis
         proc = subprocess.run(
             ["uv", "run", "otava", "analyze", "aggregate_mem"],
@@ -48,9 +50,9 @@ def test_analyze():
                 OTAVA_CONFIG=Path("examples/postgresql/config/otava.yaml"),
                 POSTGRES_HOSTNAME="localhost",
                 POSTGRES_PORT=host_port,
-                POSTGRES_USERNAME="exampleuser",
-                POSTGRES_PASSWORD="examplepassword",
-                POSTGRES_DATABASE="benchmark_results",
+                POSTGRES_USERNAME=username,
+                POSTGRES_PASSWORD=password,
+                POSTGRES_DATABASE=db,
                 BRANCH="trunk",
             ),
         )
@@ -131,7 +133,11 @@ def test_analyze_and_update_postgres():
     container, and compares stdout to the expected output (seeded data uses
     deterministic 2025 timestamps).
     """
-    with postgres_container() as (postgres_container_id, host_port):
+
+    username = "exampleuser"
+    password = "examplepassword"
+    db = "benchmark_results"
+    with postgres_container(username, password, db) as (postgres_container_id, 
host_port):
         # Run the Otava analysis
         proc = subprocess.run(
             ["uv", "run", "otava", "analyze", "aggregate_mem", 
"--update-postgres"],
@@ -143,9 +149,9 @@ def test_analyze_and_update_postgres():
                 OTAVA_CONFIG=Path("examples/postgresql/config/otava.yaml"),
                 POSTGRES_HOSTNAME="localhost",
                 POSTGRES_PORT=host_port,
-                POSTGRES_USERNAME="exampleuser",
-                POSTGRES_PASSWORD="examplepassword",
-                POSTGRES_DATABASE="benchmark_results",
+                POSTGRES_USERNAME=username,
+                POSTGRES_PASSWORD=password,
+                POSTGRES_DATABASE=db,
                 BRANCH="trunk",
             ),
         )
@@ -233,7 +239,10 @@ def test_regressions():
     waits for Postgres to be ready, runs the otava regressions command,
     and compares stdout to the expected output.
     """
-    with postgres_container() as (postgres_container_id, host_port):
+    username = "exampleuser"
+    password = "examplepassword"
+    db = "benchmark_results"
+    with postgres_container(username, password, db) as (postgres_container_id, 
host_port):
         # Run the Otava regressions command
         proc = subprocess.run(
             ["uv", "run", "otava", "regressions", "aggregate_mem"],
@@ -245,9 +254,9 @@ def test_regressions():
                 OTAVA_CONFIG=Path("examples/postgresql/config/otava.yaml"),
                 POSTGRES_HOSTNAME="localhost",
                 POSTGRES_PORT=host_port,
-                POSTGRES_USERNAME="exampleuser",
-                POSTGRES_PASSWORD="examplepassword",
-                POSTGRES_DATABASE="benchmark_results",
+                POSTGRES_USERNAME=username,
+                POSTGRES_PASSWORD=password,
+                POSTGRES_DATABASE=db,
                 BRANCH="trunk",
             ),
         )
@@ -310,106 +319,48 @@ def test_regressions():
         assert forward_change == backward_change == p_value == ""
 
 
-@contextmanager
-def postgres_container():
-    """
-    Context manager for running a PostgreSQL container.
-    Yields the container ID and ensures cleanup on exit.
-    """
-    if not shutil.which("docker"):
-        pytest.fail("docker is not available on PATH")
+def _postgres_readiness_check_f(
+    username: str, database: str
+) -> Callable[[str, dict[int, int]], bool]:
+    """Check if PostgreSQL is ready to accept connections."""
 
-    container_id = None
-    try:
-        # Start postgres container
+    def _inner(
+        container_id: str,
+        port_map: dict[int, int],
+    ) -> bool:
         cmd = [
             "docker",
-            "run",
-            "-d",
-            "--env",
-            "POSTGRES_USER=exampleuser",
-            "--env",
-            "POSTGRES_PASSWORD=examplepassword",
-            "--env",
-            "POSTGRES_DB=benchmark_results",
-            "--volume",
-            
f"{Path('examples/postgresql/init-db').resolve()}:/docker-entrypoint-initdb.d",
-            "--publish",
-            "5432",
-            "postgres:latest",
-        ]
-        proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
-        if proc.returncode != 0:
-            pytest.fail(
-                "Docker command returned non-zero exit code.\n\n"
-                f"Command: {cmd!r}\n"
-                f"Exit code: {proc.returncode}\n\n"
-                f"Stdout:\n{proc.stdout}\n\n"
-                f"Stderr:\n{proc.stderr}\n"
-            )
-        container_id = proc.stdout.strip()
-        # Determine the randomly assigned host port for 5432/tcp
-        inspect_cmd = [
-            "docker",
-            "inspect",
-            "-f",
-            '{{ (index (index .NetworkSettings.Ports "5432/tcp") 0).HostPort 
}}',
+            "exec",
             container_id,
+            "pg_isready",
+            "-U",
+            username,
+            "-d",
+            database,
         ]
-        inspect_proc = subprocess.run(inspect_cmd, capture_output=True, 
text=True, timeout=60)
-        if inspect_proc.returncode != 0:
-            pytest.fail(
-                "Docker inspect returned non-zero exit code.\n\n"
-                f"Command: {inspect_cmd!r}\n"
-                f"Exit code: {inspect_proc.returncode}\n\n"
-                f"Stdout:\n{inspect_proc.stdout}\n\n"
-                f"Stderr:\n{inspect_proc.stderr}\n"
-            )
-        host_port = inspect_proc.stdout.strip()
-
-        # Wait until Postgres responds
-        deadline = time.time() + 60
-        ready = False
-        while time.time() < deadline:
-            # First ensure the assigned host port accepts TCP connections
-            try:
-                with socket.create_connection(("localhost", int(host_port)), 
timeout=1):
-                    port_ready = True
-            except OSError:
-                port_ready = False
-                continue
-
-            # Then check pg_isready inside the container
-            cmd = [
-                "docker",
-                "exec",
-                container_id,
-                "pg_isready",
-                "-U",
-                "exampleuser",
-                "-d",
-                "benchmark_results",
-            ]
-            proc = subprocess.run(cmd, capture_output=True, text=True)
-            if port_ready and proc.returncode == 0:
-                ready = True
-                break
-            time.sleep(1)
-
-        if not ready:
-            pytest.fail("Postgres did not become ready within timeout.")
+        proc = subprocess.run(cmd, capture_output=True, text=True)
+        return proc.returncode == 0
 
-        yield container_id, host_port
-    finally:
-        if container_id:
-            res = subprocess.run(
-                ["docker", "stop", container_id], capture_output=True, 
text=True, timeout=60
-            )
-            if res.returncode != 0:
-                pytest.fail(
-                    f"Docker command returned non-zero exit code: 
{res.returncode}\nStdout: {res.stdout}\nStderr: {res.stderr}"
-                )
+    return _inner
 
 
-def _remove_trailing_whitespaces(s: str) -> str:
-    return "\n".join(line.rstrip() for line in s.splitlines())
+@contextmanager
+def postgres_container(username, password, database):
+    """
+    Context manager for running a PostgreSQL container.
+    Yields the container ID and ensures cleanup on exit.
+    """
+    with container(
+        "postgres:latest",
+        env={
+            "POSTGRES_USER": username,
+            "POSTGRES_PASSWORD": password,
+            "POSTGRES_DB": database,
+        },
+        ports=[5432],
+        volumes={
+            str(Path("examples/postgresql/init-db").resolve()): 
"/docker-entrypoint-initdb.d",
+        },
+        readiness_check=_postgres_readiness_check_f(username, database),
+    ) as (container_id, port_map):
+        yield container_id, str(port_map[5432])

Reply via email to