This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ac14b470d25 Add task SDK integration test foundation with JWT auth and 
session fixtures (#56139)
ac14b470d25 is described below

commit ac14b470d2534edc5cc7d50877cf1aa41bfe65f9
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Oct 14 21:56:51 2025 +0530

    Add task SDK integration test foundation with JWT auth and session fixtures 
(#56139)
---
 .../constants.py => dags/test_dag.py}              |  36 ++-
 task-sdk-tests/docker/docker-compose.yaml          |  18 +-
 task-sdk-tests/tests/task_sdk_tests/__init__.py    |   8 +
 task-sdk-tests/tests/task_sdk_tests/conftest.py    | 245 ++++++++++++++++++++-
 task-sdk-tests/tests/task_sdk_tests/constants.py   |  21 +-
 task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py  | 107 +++++++++
 .../tests/task_sdk_tests/test_task_sdk_health.py   | 130 ++---------
 7 files changed, 430 insertions(+), 135 deletions(-)

diff --git a/task-sdk-tests/tests/task_sdk_tests/constants.py 
b/task-sdk-tests/dags/test_dag.py
similarity index 53%
copy from task-sdk-tests/tests/task_sdk_tests/constants.py
copy to task-sdk-tests/dags/test_dag.py
index 4b65352c6c4..c8f19597ba7 100644
--- a/task-sdk-tests/tests/task_sdk_tests/constants.py
+++ b/task-sdk-tests/dags/test_dag.py
@@ -16,18 +16,32 @@
 # under the License.
 from __future__ import annotations
 
-import os
-from pathlib import Path
+import time
 
-AIRFLOW_ROOT_PATH = Path(__file__).resolve().parents[3]
-TASK_SDK_TESTS_ROOT = Path(__file__).resolve().parents[2]
+from airflow.sdk import DAG, task
 
-DEFAULT_PYTHON_MAJOR_MINOR_VERSION = "3.10"
-DEFAULT_DOCKER_IMAGE = 
f"ghcr.io/apache/airflow/main/prod/python{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}:latest"
-DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE") or DEFAULT_DOCKER_IMAGE
+dag = DAG("test_dag", description="Test DAG for Task SDK testing with 
long-running task", schedule=None)
 
-DOCKER_COMPOSE_HOST_PORT = os.environ.get("HOST_PORT", "localhost:8080")
-TASK_SDK_HOST_PORT = os.environ.get("TASK_SDK_HOST_PORT", "localhost:8080")
-TASK_SDK_API_VERSION = "2025-08-10"
 
-DOCKER_COMPOSE_FILE_PATH = TASK_SDK_TESTS_ROOT / "docker" / 
"docker-compose.yaml"
+@task(dag=dag)
+def get_task_instance_id(ti=None):
+    """Task that returns its own task instance ID"""
+    return str(ti.id)
+
+
+@task(dag=dag)
+def long_running_task(ti=None):
+    """Long-running task that sleeps for 5 minutes to allow testing"""
+    print(f"Starting long-running task with TI ID: {ti.id}")
+    print("This task will run for 5 minutes to allow API testing...")
+
+    time.sleep(3000)
+
+    print("Long-running task completed!")
+    return "test completed"
+
+
+get_ti_id = get_task_instance_id()
+long_task = long_running_task()
+
+get_ti_id >> long_task
diff --git a/task-sdk-tests/docker/docker-compose.yaml 
b/task-sdk-tests/docker/docker-compose.yaml
index 29ece3d5316..1bf57694709 100644
--- a/task-sdk-tests/docker/docker-compose.yaml
+++ b/task-sdk-tests/docker/docker-compose.yaml
@@ -22,12 +22,20 @@ x-airflow-common:
   environment:
     &airflow-common-env
     AIRFLOW__CORE__EXECUTOR: LocalExecutor
-    AIRFLOW__CORE__AUTH_MANAGER: 
airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
+    # yamllint disable rule:line-length
+    AIRFLOW__CORE__AUTH_MANAGER: 
'airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager'
+    AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS: 'true'
     AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres/airflow
     AIRFLOW__CORE__FERNET_KEY: ''
     AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
+    AIRFLOW__CORE__DAGS_FOLDER: '/opt/airflow/dags'
     AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 
'http://airflow-apiserver:8080/execution/'
+    AIRFLOW__API__BASE_URL: 'http://airflow-apiserver:8080/'
+    AIRFLOW__API_AUTH__JWT_SECRET: 'test-secret-key-for-testing'
   user: "${AIRFLOW_UID:-50000}:0"
+  volumes:
+    - ${PWD}/dags:/opt/airflow/dags
+    - ${PWD}/logs:/opt/airflow/logs
   depends_on:
     &airflow-common-depends-on
     postgres:
@@ -103,3 +111,11 @@ services:
         condition: service_healthy
       airflow-init:
         condition: service_completed_successfully
+
+  airflow-dag-processor:
+    <<: *airflow-common
+    command: dag-processor
+    depends_on:
+      <<: *airflow-common-depends-on
+      airflow-init:
+        condition: service_completed_successfully
diff --git a/task-sdk-tests/tests/task_sdk_tests/__init__.py 
b/task-sdk-tests/tests/task_sdk_tests/__init__.py
index 13a83393a91..973b8fdce34 100644
--- a/task-sdk-tests/tests/task_sdk_tests/__init__.py
+++ b/task-sdk-tests/tests/task_sdk_tests/__init__.py
@@ -14,3 +14,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import annotations
+
+from rich.console import Console
+
+console = Console(width=400, color_system="standard")
+
+
+__all__ = ["console"]
diff --git a/task-sdk-tests/tests/task_sdk_tests/conftest.py 
b/task-sdk-tests/tests/task_sdk_tests/conftest.py
index 47aed12906f..2b2e8274330 100644
--- a/task-sdk-tests/tests/task_sdk_tests/conftest.py
+++ b/task-sdk-tests/tests/task_sdk_tests/conftest.py
@@ -19,16 +19,121 @@ from __future__ import annotations
 import os
 import subprocess
 import sys
+from pathlib import Path
 
-from task_sdk_tests.constants import AIRFLOW_ROOT_PATH
+import pytest
 
+from task_sdk_tests import console
+from task_sdk_tests.constants import (
+    AIRFLOW_ROOT_PATH,
+    DOCKER_COMPOSE_FILE_PATH,
+    DOCKER_IMAGE,
+    TASK_SDK_HOST_PORT,
+)
 
-def pytest_sessionstart(session):
-    """Install Task SDK at the very start of the pytest session."""
-    from rich.console import Console
 
-    console = Console(width=400, color_system="standard")
+def print_diagnostics(compose, compose_version, docker_version):
+    """Print diagnostic information when test fails."""
+    console.print("[red]=== DIAGNOSTIC INFORMATION ===[/]")
+    console.print(f"Docker version: {docker_version}")
+    console.print(f"Docker Compose version: {compose_version}")
+    console.print("\n[yellow]Container Status:[/]")
+    try:
+        containers = compose.compose.ps()
+        for container in containers:
+            console.print(f"  {container.name}: {container.state}")
+    except Exception as e:
+        console.print(f"  Error getting container status: {e}")
+
+    console.print("\n[yellow]Container Logs:[/]")
+    try:
+        logs = compose.compose.logs()
+        console.print(logs)
+    except Exception as e:
+        console.print(f"  Error getting logs: {e}")
+
+
+def debug_environment():
+    """Debug the Python environment setup in CI."""
+
+    import os
+    import subprocess
+    import sys
+
+    console.print("[yellow]===== CI ENVIRONMENT DEBUG =====")
+    console.print(f"[blue]Python executable: {sys.executable}")
+    console.print(f"[blue]Python version: {sys.version}")
+    console.print(f"[blue]Working directory: {os.getcwd()}")
+    console.print(f"[blue]VIRTUAL_ENV: {os.environ.get('VIRTUAL_ENV', 'Not 
set')}")
+    console.print(f"[blue]PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not 
set')}")
+
+    console.print(f"[blue]Python executable exists: 
{Path(sys.executable).exists()}")
+    if Path(sys.executable).is_symlink():
+        console.print(f"[blue]Python executable is symlink to: 
{Path(sys.executable).readlink()}")
+
+    try:
+        uv_python = subprocess.check_output(["uv", "python", "find"], 
text=True).strip()
+        console.print(f"[cyan]UV Python: {uv_python}")
+        console.print(f"[green]Match: {uv_python == sys.executable}")
+
+        console.print(f"[cyan]UV Python exists: {Path(uv_python).exists()}")
+        if Path(uv_python).is_symlink():
+            console.print(f"[cyan]UV Python is symlink to: 
{Path(uv_python).readlink()}")
+    except Exception as e:
+        console.print(f"[red]UV Python error: {e}")
+
+    # Check what's installed in current environment
+    try:
+        import airflow
+
+        console.print(f"[green]✅ airflow already available: 
{airflow.__file__}")
+    except ImportError:
+        console.print("[red]❌ airflow not available in current environment")
+
+    console.print("[yellow]================================")
 
+
[email protected](scope="session")
+def docker_compose_setup(tmp_path_factory):
+    """Start docker-compose once per session."""
+    import os
+    from shutil import copyfile
+
+    from python_on_whales import DockerClient, docker
+
+    # Create temp directory for docker-compose
+    tmp_dir = tmp_path_factory.mktemp("airflow-task-sdk-test")
+    tmp_docker_compose_file = tmp_dir / "docker-compose.yaml"
+    copyfile(DOCKER_COMPOSE_FILE_PATH, tmp_docker_compose_file)
+
+    # Set environment variables
+    os.environ["AIRFLOW_IMAGE_NAME"] = DOCKER_IMAGE
+    os.environ["TASK_SDK_VERSION"] = os.environ.get("TASK_SDK_VERSION", 
"1.1.0")
+
+    compose = DockerClient(compose_files=[str(tmp_docker_compose_file)])
+
+    try:
+        console.print("[yellow]Starting docker-compose for session...")
+        compose.compose.up(detach=True, wait=True)
+        console.print("[green]Docker compose started successfully!\n")
+
+        yield compose
+    except Exception as e:
+        console.print(f"[red]❌ Docker compose failed to start: {e}")
+
+        debug_environment()
+        print_diagnostics(compose, compose.version(), docker.version())
+
+        raise
+    finally:
+        if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
+            console.print("[yellow]Cleaning up docker-compose...")
+            compose.compose.down(remove_orphans=True, volumes=True, quiet=True)
+            console.print("[green]Docker compose cleaned up")
+
+
+def pytest_sessionstart(session):
+    """Install Task SDK at the very start of the pytest session."""
     task_sdk_version = os.environ.get("TASK_SDK_VERSION", "1.1.0")
     console.print(
         f"[yellow]Installing apache-airflow-task-sdk=={task_sdk_version} via 
pytest_sessionstart..."
@@ -69,3 +174,133 @@ def pytest_sessionstart(session):
         console.print(f"[red]Stdout: {e.stdout}")
         console.print(f"[red]Stderr: {e.stderr}")
         raise
+
+
[email protected](scope="session")
+def airflow_test_setup(docker_compose_setup):
+    """Fixed session-scoped fixture that matches UI behavior."""
+    import time
+
+    import requests
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.timezone import utcnow
+    from task_sdk_tests.jwt_plugin import generate_jwt_token
+
+    time.sleep(15)
+
+    # Step 1: Get auth token
+    auth_url = "http://localhost:8080/auth/token";
+    try:
+        auth_response = requests.get(auth_url, timeout=10)
+        auth_response.raise_for_status()
+        auth_token = auth_response.json()["access_token"]
+        console.print("[green]✅ Got auth token")
+    except Exception as e:
+        raise e
+
+    # Step 2: Check and unpause DAG
+    headers = {"Authorization": f"Bearer {auth_token}", "Content-Type": 
"application/json"}
+
+    console.print("[yellow]Checking DAG status...")
+    dag_response = requests.get("http://localhost:8080/api/v2/dags/test_dag";, 
headers=headers)
+    dag_response.raise_for_status()
+    dag_data = dag_response.json()
+
+    if dag_data.get("is_paused", True):
+        console.print("[yellow]Unpausing DAG...")
+        unpause_response = requests.patch(
+            "http://localhost:8080/api/v2/dags/test_dag";, json={"is_paused": 
False}, headers=headers
+        )
+        unpause_response.raise_for_status()
+        console.print("[green]✅ DAG unpaused")
+    logical_date = utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-3] + "Z"
+    payload = {"conf": {}, "logical_date": logical_date}
+
+    trigger_response = requests.post(
+        "http://localhost:8080/api/v2/dags/test_dag/dagRuns";, json=payload, 
headers=headers, timeout=30
+    )
+
+    console.print(f"[blue]Trigger DAG Run response status: 
{trigger_response.status_code}")
+    console.print(f"[blue]Trigger DAG Run response: {trigger_response.text}")
+
+    trigger_response.raise_for_status()
+    dag_run_data = trigger_response.json()
+    dag_run_id = dag_run_data["dag_run_id"]
+
+    console.print(f"[green]✅ DAG triggered: {dag_run_id}")
+
+    # Step 4: Get task instance for testing
+    console.print("[yellow]Waiting for any task instance...")
+    ti_id = None
+
+    for attempt in range(20):
+        try:
+            ti_url = 
f"http://localhost:8080/api/v2/dags/test_dag/dagRuns/{dag_run_id}/taskInstances";
+            ti_response = requests.get(ti_url, headers=headers, timeout=10)
+            ti_response.raise_for_status()
+
+            task_instances = ti_response.json().get("task_instances", [])
+
+            if task_instances:
+                first_ti = task_instances[0]
+                ti_id = first_ti.get("id")
+
+                if ti_id:
+                    console.print(f"[green]✅ Using task instance from 
'{first_ti.get('task_id')}'")
+                    console.print(f"[green]    State: {first_ti.get('state')}")
+                    console.print(f"[green]    Instance ID: {ti_id}")
+                    break
+            else:
+                console.print(f"[blue]Waiting for tasks (attempt {attempt + 
1}/20)")
+
+        except Exception as e:
+            console.print(f"[yellow]Task check failed: {e}")
+
+        time.sleep(2)
+
+    if not ti_id:
+        console.print("[red]❌ Task instances never appeared. Final debug 
info:")
+        raise TimeoutError("No task instance found within timeout period")
+
+    # Step 5: Create SDK client
+    jwt_token = generate_jwt_token(ti_id)
+    sdk_client = Client(base_url=f"http://{TASK_SDK_HOST_PORT}/execution";, 
token=jwt_token)
+
+    return {
+        "auth_token": auth_token,
+        "dag_info": {"dag_id": "test_dag", "dag_run_id": dag_run_id, 
"logical_date": logical_date},
+        "task_instance_id": ti_id,
+        "sdk_client": sdk_client,
+        "core_api_headers": headers,
+    }
+
+
[email protected](scope="session")
+def auth_token(airflow_test_setup):
+    """Get the auth token from setup."""
+    return airflow_test_setup["auth_token"]
+
+
[email protected](scope="session")
+def dag_info(airflow_test_setup):
+    """Get DAG information from setup."""
+    return airflow_test_setup["dag_info"]
+
+
[email protected](scope="session")
+def task_instance_id(airflow_test_setup):
+    """Get task instance ID from setup."""
+    return airflow_test_setup["task_instance_id"]
+
+
[email protected](scope="session")
+def sdk_client(airflow_test_setup):
+    """Get authenticated Task SDK client from setup."""
+    return airflow_test_setup["sdk_client"]
+
+
[email protected](scope="session")
+def core_api_headers(airflow_test_setup):
+    """Get Core API headers from setup."""
+    return airflow_test_setup["core_api_headers"]
diff --git a/task-sdk-tests/tests/task_sdk_tests/constants.py 
b/task-sdk-tests/tests/task_sdk_tests/constants.py
index 4b65352c6c4..1e35e0106c2 100644
--- a/task-sdk-tests/tests/task_sdk_tests/constants.py
+++ b/task-sdk-tests/tests/task_sdk_tests/constants.py
@@ -28,6 +28,25 @@ DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE") or 
DEFAULT_DOCKER_IMAGE
 
 DOCKER_COMPOSE_HOST_PORT = os.environ.get("HOST_PORT", "localhost:8080")
 TASK_SDK_HOST_PORT = os.environ.get("TASK_SDK_HOST_PORT", "localhost:8080")
-TASK_SDK_API_VERSION = "2025-08-10"
+
+
+# This represents the Execution API schema version, NOT the Task SDK package 
version.
+#
+# Purpose:
+# - Defines the API contract between Task SDK and Airflow's Execution API
+# - Enables backward compatibility when API schemas evolve
+# - Uses calver format (YYYY-MM-DD) based on expected release dates
+#
+# Usage:
+# - Sent as "Airflow-API-Version" header with every API request
+# - Server uses this to determine which schema version to serve
+# - Allows older Task SDK versions to work with newer Airflow servers
+#
+# Version vs Package Version:
+# - API Version: "2025-09-23" (schema compatibility)
+# - Package Version: "1.1.0" (Task SDK release version)
+#
+# Keep this in sync with: task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+TASK_SDK_API_VERSION = "2025-09-23"
 
 DOCKER_COMPOSE_FILE_PATH = TASK_SDK_TESTS_ROOT / "docker" / 
"docker-compose.yaml"
diff --git a/task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py 
b/task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py
new file mode 100644
index 00000000000..3caf4193eea
--- /dev/null
+++ b/task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""JWT Token Plugin for Task SDK Integration Tests."""
+
+from __future__ import annotations
+
+import os
+import uuid
+from datetime import datetime, timezone
+from typing import Any
+
+import jwt
+
+
+class JWTTokenGenerator:
+    """Generator for JWT tokens used in Task SDK API authentication."""
+
+    def __init__(self):
+        """Initialize JWT configuration from environment variables."""
+        self.secret = os.getenv("AIRFLOW__API_AUTH__JWT_SECRET", 
"test-secret-key-for-testing")
+        self.issuer = os.getenv("AIRFLOW__API_AUTH__JWT_ISSUER", 
"airflow-test")
+        self.audience = os.getenv("AIRFLOW__API_AUTH__JWT_AUDIENCE", 
"urn:airflow.apache.org:task")
+        self.algorithm = os.getenv("AIRFLOW__API_AUTH__JWT_ALGORITHM", "HS512")
+        self.kid = os.getenv("AIRFLOW__API_AUTH__JWT_KID", "test-key-id")
+
+    def generate_token(
+        self,
+        task_instance_id: str,
+        expires_in_seconds: int = 3600,
+        extra_claims: dict[str, Any] | None = None,
+        extra_headers: dict[str, Any] | None = None,
+    ) -> str:
+        """
+        Generate a JWT token for task instance authentication.
+
+        Args:
+            task_instance_id: The task instance ID to use as the 'sub' claim
+            expires_in_seconds: Token expiration time in seconds (default: 1 
hour)
+            extra_claims: Additional claims to include in the token
+            extra_headers: Additional headers to include in the token
+
+        Returns:
+            JWT token as a string
+        """
+        now = int(datetime.now(timezone.utc).timestamp())
+
+        claims = {
+            "jti": uuid.uuid4().hex,
+            "iss": self.issuer,
+            "aud": self.audience,
+            "nbf": now,
+            "exp": now + expires_in_seconds,
+            "iat": now,
+            "sub": task_instance_id,
+        }
+
+        # Remove audience if not set
+        if not claims.get("aud"):
+            del claims["aud"]
+
+        # Add extra claims if provided
+        if extra_claims:
+            claims.update(extra_claims)
+
+        # Base JWT headers
+        headers = {
+            "alg": self.algorithm,
+            "kid": self.kid,
+        }
+
+        # Add extra headers if provided
+        if extra_headers:
+            headers.update(extra_headers)
+
+        # Generate and return the token
+        token = jwt.encode(claims, self.secret, algorithm=self.algorithm, 
headers=headers)
+        return token
+
+
+def generate_jwt_token(task_instance_id: str, expires_in_seconds: int = 3600) 
-> str:
+    """
+    Convenience function to generate a JWT token.
+
+    Args:
+        task_instance_id: The task instance ID to use as the 'sub' claim
+        expires_in_seconds: Token expiration time in seconds (default: 1 hour)
+
+    Returns:
+        JWT token as a string
+    """
+    generator = JWTTokenGenerator()
+    return generator.generate_token(task_instance_id, expires_in_seconds)
diff --git a/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py 
b/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py
index d4b3b1ba94d..578068c9c6a 100644
--- a/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py
+++ b/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py
@@ -16,129 +16,25 @@
 # under the License.
 from __future__ import annotations
 
-import os
-from pathlib import Path
-from shutil import copyfile
-
-from python_on_whales import DockerClient, docker
-from rich.console import Console
-
+from task_sdk_tests import console
 from task_sdk_tests.constants import (
-    DOCKER_COMPOSE_FILE_PATH,
-    DOCKER_IMAGE,
     TASK_SDK_API_VERSION,
-    TASK_SDK_HOST_PORT,
 )
 
-console = Console(width=400, color_system="standard")
-
-
-def print_diagnostics(compose, compose_version, docker_version):
-    """Print diagnostic information when test fails."""
-    console.print("[red]=== DIAGNOSTIC INFORMATION ===[/]")
-    console.print(f"Docker version: {docker_version}")
-    console.print(f"Docker Compose version: {compose_version}")
-    console.print("\n[yellow]Container Status:[/]")
-    try:
-        containers = compose.compose.ps()
-        for container in containers:
-            console.print(f"  {container.name}: {container.state}")
-    except Exception as e:
-        console.print(f"  Error getting container status: {e}")
-
-    console.print("\n[yellow]Container Logs:[/]")
-    try:
-        logs = compose.compose.logs()
-        console.print(logs)
-    except Exception as e:
-        console.print(f"  Error getting logs: {e}")
-
-
-def debug_environment():
-    """Debug the Python environment setup in CI."""
-
-    import os
-    import subprocess
-    import sys
-
-    console.print("[yellow]===== CI ENVIRONMENT DEBUG =====")
-    console.print(f"[blue]Python executable: {sys.executable}")
-    console.print(f"[blue]Python version: {sys.version}")
-    console.print(f"[blue]Working directory: {os.getcwd()}")
-    console.print(f"[blue]VIRTUAL_ENV: {os.environ.get('VIRTUAL_ENV', 'Not 
set')}")
-    console.print(f"[blue]PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not 
set')}")
-
-    console.print(f"[blue]Python executable exists: 
{Path(sys.executable).exists()}")
-    if Path(sys.executable).is_symlink():
-        console.print(f"[blue]Python executable is symlink to: 
{Path(sys.executable).readlink()}")
-
-    try:
-        uv_python = subprocess.check_output(["uv", "python", "find"], 
text=True).strip()
-        console.print(f"[cyan]UV Python: {uv_python}")
-        console.print(f"[green]Match: {uv_python == sys.executable}")
-
-        console.print(f"[cyan]UV Python exists: {Path(uv_python).exists()}")
-        if Path(uv_python).is_symlink():
-            console.print(f"[cyan]UV Python is symlink to: 
{Path(uv_python).readlink()}")
-    except Exception as e:
-        console.print(f"[red]UV Python error: {e}")
-
-    # Check what's installed in current environment
-    try:
-        import airflow
-
-        console.print(f"[green]✅ airflow already available: 
{airflow.__file__}")
-    except ImportError:
-        console.print("[red]❌ airflow not available in current environment")
-
-    console.print("[yellow]================================")
-
-
-def test_task_sdk_health(tmp_path_factory, monkeypatch):
-    """Test Task SDK health check using docker-compose environment."""
-    tmp_dir = tmp_path_factory.mktemp("airflow-task-sdk-test")
-    console.print(f"[yellow]Tests are run in {tmp_dir}")
-
-    # Copy docker-compose.yaml to temp directory
-    tmp_docker_compose_file = tmp_dir / "docker-compose.yaml"
-    copyfile(DOCKER_COMPOSE_FILE_PATH, tmp_docker_compose_file)
-
-    # Set environment variables for the test
-    monkeypatch.setenv("AIRFLOW_IMAGE_NAME", DOCKER_IMAGE)
-    monkeypatch.setenv("TASK_SDK_VERSION", os.environ.get("TASK_SDK_VERSION", 
"1.0.3"))
-
-    # Initialize Docker client
-    compose = DockerClient(compose_files=[str(tmp_docker_compose_file)])
-
-    try:
-        compose.compose.up(detach=True, wait=True)
-        console.print("[green]Docker compose started for task SDK test\n")
-
-        try:
-            from airflow.sdk.api.client import Client
-
-            console.print("[green]✅ Task SDK client imported successfully!")
-        except ImportError as e:
-            console.print(f"[red]❌ Failed to import Task SDK client: {e}")
-            raise
 
-        client = Client(base_url=f"http://{TASK_SDK_HOST_PORT}/execution";, 
token="not-a-token")
+def test_task_sdk_health(sdk_client):
+    """Test Task SDK health check using session setup."""
+    client = sdk_client
 
-        console.print("[yellow]Making health check request...")
-        response = client.get("health/ping", headers={"Airflow-API-Version": 
TASK_SDK_API_VERSION})
+    console.print("[yellow]Making health check request...")
+    response = client.get("health/ping", headers={"Airflow-API-Version": 
TASK_SDK_API_VERSION})
 
-        console.print(" Health Check Response ".center(72, "="))
-        console.print(f"[bright_blue]Status Code:[/] {response.status_code}")
-        console.print(f"[bright_blue]Response:[/] {response.json()}")
-        console.print("=" * 72)
+    console.print(" Health Check Response ".center(72, "="))
+    console.print(f"[bright_blue]Status Code:[/] {response.status_code}")
+    console.print(f"[bright_blue]Response:[/] {response.json()}")
+    console.print("=" * 72)
 
-        assert response.status_code == 200
-        assert response.json() == {"ok": 
["airflow.api_fastapi.auth.tokens.JWTValidator"], "failing": {}}
+    assert response.status_code == 200
+    assert response.json() == {"ok": 
["airflow.api_fastapi.auth.tokens.JWTValidator"], "failing": {}}
 
-    except Exception:
-        print_diagnostics(compose, compose.version(), docker.version())
-        raise
-    finally:
-        if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
-            compose.compose.down(remove_orphans=True, volumes=True, quiet=True)
-            console.print("[green]Docker compose instance deleted")
+    console.print("[green]✅ Task SDK health check passed!")

Reply via email to