This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6bac899dd1a Implementing Task SDK integration tests for Dag Run
operations (#58666)
6bac899dd1a is described below
commit 6bac899dd1a3cd76f783b06f1484995c1b59f404
Author: Jeongwoo Do <[email protected]>
AuthorDate: Fri Nov 28 09:41:56 2025 +0900
Implementing Task SDK integration tests for Dag Run operations (#58666)
* implement dag run integration test
* fix docs
---------
Co-authored-by: Amogh Desai <[email protected]>
---
.../task_sdk_tests/test_dag_run_operations.py | 102 +++++++++++++++++----
1 file changed, 82 insertions(+), 20 deletions(-)
diff --git
a/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py
b/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py
index b487279a71a..81f61f1cba0 100644
--- a/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py
+++ b/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py
@@ -21,19 +21,28 @@ Integration tests for DAG Run operations.
These tests validate the Execution API endpoints for DAG Run operations:
- get_state(): Get DAG run state
- get_count(): Get count of DAG runs
+- get_previous(): Get previous DAG run
"""
from __future__ import annotations
+from datetime import datetime, timedelta, timezone
+
import pytest
+from airflow.sdk.api.client import ServerResponseError
from airflow.sdk.api.datamodels._generated import DagRunStateResponse
-from airflow.sdk.execution_time.comms import DRCount
+from airflow.sdk.execution_time.comms import DRCount, PreviousDagRunResult
from task_sdk_tests import console
def test_dag_run_get_state(sdk_client, dag_info):
- """Test getting DAG run state."""
+ """
+ Test getting state for DAG run.
+
+ Expected: DagRunStateResponse with running or success state
+ Endpoint: GET /execution/dag-runs/{dag_id}/{run_id}/state
+ """
console.print("[yellow]Getting DAG run state...")
response = sdk_client.dag_runs.get_state(
@@ -53,7 +62,12 @@ def test_dag_run_get_state(sdk_client, dag_info):
def test_dag_run_get_count(sdk_client, dag_info):
- """Test getting count of DAG runs."""
+ """
+ Test getting count of DAG run.
+
+ Expected: DRCount with count >= 1
+ Endpoint: GET /execution/dag-runs/count
+ """
console.print("[yellow]Getting DAG run count...")
response = sdk_client.dag_runs.get_count(
@@ -72,31 +86,54 @@ def test_dag_run_get_count(sdk_client, dag_info):
console.print("[green]✅ DAG run get count test passed!")
[email protected](reason="TODO: Implement DAG Run get_state (not found) test")
-def test_dag_run_get_state_not_found(sdk_client):
+def test_dag_run_get_state_not_found(sdk_client, dag_info):
"""
Test getting state for non-existent DAG run.
- Expected: ErrorResponse with DAGRUN_NOT_FOUND error
+ Expected: ServerResponseError with 404 status code
Endpoint: GET /execution/dag-runs/{dag_id}/{run_id}/state
"""
- console.print("[yellow]TODO: Implement test_dag_run_get_state_not_found")
- raise NotImplementedError("test_dag_run_get_state_not_found not
implemented")
+ console.print("[yellow]Getting non-existent DAG run state...")
+
+ with pytest.raises(ServerResponseError) as exc_info:
+ sdk_client.dag_runs.get_state(
+ dag_id="not_exist",
+ run_id=dag_info["dag_run_id"],
+ )
+
+ console.print(" Non-existent DAG Run State Response ".center(72, "="))
+ console.print(f"[bright_blue]Exception Type:[/]
{type(exc_info.value).__name__}")
+ console.print(f"[bright_blue]Status Code:[/]
{exc_info.value.response.status_code}")
+ console.print(f"[bright_blue]Error Message:[/] {str(exc_info.value)}")
+ console.print("=" * 72)
+
+ assert exc_info.value.response.status_code == 404
+ console.print("[green]✅ DAG run get state (not found) test passed!")
[email protected](reason="TODO: Implement DAG Run get_count (not found) test")
-def test_dag_run_get_count_not_found(sdk_client):
+def test_dag_run_get_count_not_found(sdk_client, dag_info):
"""
- Test getting count for non-existent DAG.
+ Test getting count of non-existent DAG run.
Expected: DRCount with count=0
Endpoint: GET /execution/dag-runs/count
"""
- console.print("[yellow]TODO: Implement test_dag_run_get_count_not_found")
- raise NotImplementedError("test_dag_run_get_count_not_found not
implemented")
+ console.print("[yellow]Getting non-existent DAG run count...")
+
+ response = sdk_client.dag_runs.get_count(
+ dag_id="not_exist",
+ run_ids=[dag_info["dag_run_id"]],
+ )
+ console.print(" Non-existent DAG Run Count Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Count:[/] {response.count}")
+ console.print("=" * 72)
+
+ assert isinstance(response, DRCount)
+ assert response.count == 0, f"Expected 0 DAG run, got {response.count}"
+ console.print("[green]✅ Non-existent DAG run get count test passed!")
[email protected](reason="TODO: Implement DAG Run get_previous test")
def test_dag_run_get_previous(sdk_client, dag_info):
"""
Test getting previous DAG run before a logical date.
@@ -104,17 +141,42 @@ def test_dag_run_get_previous(sdk_client, dag_info):
Expected: PreviousDagRunResult with dag_run field
Endpoint: GET /execution/dag-runs/{dag_id}/previous
"""
- console.print("[yellow]TODO: Implement test_dag_run_get_previous")
- raise NotImplementedError("test_dag_run_get_previous not implemented")
+ console.print("[yellow]Getting previous DAG run state...")
+
+ response = sdk_client.dag_runs.get_previous(
+ dag_id=dag_info["dag_id"],
+ logical_date=datetime.now(timezone.utc) + timedelta(seconds=10),
+ )
+
+ console.print(" Previous DAG Run Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Previous DAG Run:[/] {response.dag_run}")
+ console.print("=" * 72)
+
+ assert isinstance(response, PreviousDagRunResult)
+ assert response.dag_run is not None
+ console.print("[green]✅ Previous DAG run get test passed!")
[email protected](reason="TODO: Implement DAG Run get_previous (not found)
test")
def test_dag_run_get_previous_not_found(sdk_client):
"""
Test getting previous DAG run for non-existent DAG.
- Expected: ErrorResponse with DAG_NOT_FOUND error
+ Expected: PreviousDagRunResult with dag_run is None
Endpoint: GET /execution/dag-runs/{dag_id}/previous
"""
- console.print("[yellow]TODO: Implement
test_dag_run_get_previous_not_found")
- raise NotImplementedError("test_dag_run_get_previous_not_found not
implemented")
+ console.print("[yellow]Getting non-existent previous DAG run...")
+
+ response = sdk_client.dag_runs.get_previous(
+ dag_id="not_exist",
+ logical_date=datetime.now(timezone.utc) + timedelta(seconds=10),
+ )
+
+ console.print(" Non-existent Previous DAG Run Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Detail:[/] {response.dag_run}")
+ console.print("=" * 72)
+
+ assert isinstance(response, PreviousDagRunResult)
+ assert response.dag_run is None
+ console.print("[green]✅ Previous DAG run (not found) get test passed!")