This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bac899dd1a Implementing Task SDK integration tests for Dag Run 
operations (#58666)
6bac899dd1a is described below

commit 6bac899dd1a3cd76f783b06f1484995c1b59f404
Author: Jeongwoo Do <[email protected]>
AuthorDate: Fri Nov 28 09:41:56 2025 +0900

    Implementing Task SDK integration tests for Dag Run operations (#58666)
    
    * implement dag run integration test
    
    * fix docs
    
    ---------
    
    Co-authored-by: Amogh Desai <[email protected]>
---
 .../task_sdk_tests/test_dag_run_operations.py      | 102 +++++++++++++++++----
 1 file changed, 82 insertions(+), 20 deletions(-)

diff --git 
a/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py 
b/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py
index b487279a71a..81f61f1cba0 100644
--- a/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py
+++ b/task-sdk-integration-tests/tests/task_sdk_tests/test_dag_run_operations.py
@@ -21,19 +21,28 @@ Integration tests for DAG Run operations.
 These tests validate the Execution API endpoints for DAG Run operations:
 - get_state(): Get DAG run state
 - get_count(): Get count of DAG runs
+- get_previous(): Get previous DAG run
 """
 
 from __future__ import annotations
 
+from datetime import datetime, timedelta, timezone
+
 import pytest
 
+from airflow.sdk.api.client import ServerResponseError
 from airflow.sdk.api.datamodels._generated import DagRunStateResponse
-from airflow.sdk.execution_time.comms import DRCount
+from airflow.sdk.execution_time.comms import DRCount, PreviousDagRunResult
 from task_sdk_tests import console
 
 
 def test_dag_run_get_state(sdk_client, dag_info):
-    """Test getting DAG run state."""
+    """
+    Test getting state for DAG run.
+
+    Expected: DagRunStateResponse with running or success state
+    Endpoint: GET /execution/dag-runs/{dag_id}/{run_id}/state
+    """
     console.print("[yellow]Getting DAG run state...")
 
     response = sdk_client.dag_runs.get_state(
@@ -53,7 +62,12 @@ def test_dag_run_get_state(sdk_client, dag_info):
 
 
 def test_dag_run_get_count(sdk_client, dag_info):
-    """Test getting count of DAG runs."""
+    """
+    Test getting count of DAG run.
+
+    Expected: DRCount with count >= 1
+    Endpoint: GET /execution/dag-runs/count
+    """
     console.print("[yellow]Getting DAG run count...")
 
     response = sdk_client.dag_runs.get_count(
@@ -72,31 +86,54 @@ def test_dag_run_get_count(sdk_client, dag_info):
     console.print("[green]✅ DAG run get count test passed!")
 
 
[email protected](reason="TODO: Implement DAG Run get_state (not found) test")
-def test_dag_run_get_state_not_found(sdk_client):
+def test_dag_run_get_state_not_found(sdk_client, dag_info):
     """
     Test getting state for non-existent DAG run.
 
-    Expected: ErrorResponse with DAGRUN_NOT_FOUND error
+    Expected: ServerResponseError with 404 status code
     Endpoint: GET /execution/dag-runs/{dag_id}/{run_id}/state
     """
-    console.print("[yellow]TODO: Implement test_dag_run_get_state_not_found")
-    raise NotImplementedError("test_dag_run_get_state_not_found not 
implemented")
+    console.print("[yellow]Getting non-existent DAG run state...")
+
+    with pytest.raises(ServerResponseError) as exc_info:
+        sdk_client.dag_runs.get_state(
+            dag_id="not_exist",
+            run_id=dag_info["dag_run_id"],
+        )
+
+    console.print(" Non-existent DAG Run State Response ".center(72, "="))
+    console.print(f"[bright_blue]Exception Type:[/] 
{type(exc_info.value).__name__}")
+    console.print(f"[bright_blue]Status Code:[/] 
{exc_info.value.response.status_code}")
+    console.print(f"[bright_blue]Error Message:[/] {str(exc_info.value)}")
+    console.print("=" * 72)
+
+    assert exc_info.value.response.status_code == 404
+    console.print("[green]✅ DAG run get state (not found) test passed!")
 
 
[email protected](reason="TODO: Implement DAG Run get_count (not found) test")
-def test_dag_run_get_count_not_found(sdk_client):
+def test_dag_run_get_count_not_found(sdk_client, dag_info):
     """
-    Test getting count for non-existent DAG.
+    Test getting count of non-existent DAG run.
 
     Expected: DRCount with count=0
     Endpoint: GET /execution/dag-runs/count
     """
-    console.print("[yellow]TODO: Implement test_dag_run_get_count_not_found")
-    raise NotImplementedError("test_dag_run_get_count_not_found not 
implemented")
+    console.print("[yellow]Getting non-existent DAG run count...")
+
+    response = sdk_client.dag_runs.get_count(
+        dag_id="not_exist",
+        run_ids=[dag_info["dag_run_id"]],
+    )
+    console.print(" Non-existent DAG Run Count Response ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+    console.print(f"[bright_blue]Count:[/] {response.count}")
+    console.print("=" * 72)
+
+    assert isinstance(response, DRCount)
+    assert response.count == 0, f"Expected 0 DAG run, got {response.count}"
+    console.print("[green]✅ Non-existent DAG run get count test passed!")
 
 
[email protected](reason="TODO: Implement DAG Run get_previous test")
 def test_dag_run_get_previous(sdk_client, dag_info):
     """
     Test getting previous DAG run before a logical date.
@@ -104,17 +141,42 @@ def test_dag_run_get_previous(sdk_client, dag_info):
     Expected: PreviousDagRunResult with dag_run field
     Endpoint: GET /execution/dag-runs/{dag_id}/previous
     """
-    console.print("[yellow]TODO: Implement test_dag_run_get_previous")
-    raise NotImplementedError("test_dag_run_get_previous not implemented")
+    console.print("[yellow]Getting previous DAG run state...")
+
+    response = sdk_client.dag_runs.get_previous(
+        dag_id=dag_info["dag_id"],
+        logical_date=datetime.now(timezone.utc) + timedelta(seconds=10),
+    )
+
+    console.print(" Previous DAG Run Response ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+    console.print(f"[bright_blue]Previous DAG Run:[/] {response.dag_run}")
+    console.print("=" * 72)
+
+    assert isinstance(response, PreviousDagRunResult)
+    assert response.dag_run is not None
+    console.print("[green]✅ Previous DAG run get test passed!")
 
 
[email protected](reason="TODO: Implement DAG Run get_previous (not found) 
test")
 def test_dag_run_get_previous_not_found(sdk_client):
     """
     Test getting previous DAG run for non-existent DAG.
 
-    Expected: ErrorResponse with DAG_NOT_FOUND error
+    Expected: PreviousDagRunResult with dag_run is None
     Endpoint: GET /execution/dag-runs/{dag_id}/previous
     """
-    console.print("[yellow]TODO: Implement 
test_dag_run_get_previous_not_found")
-    raise NotImplementedError("test_dag_run_get_previous_not_found not 
implemented")
+    console.print("[yellow]Getting non-existent previous DAG run...")
+
+    response = sdk_client.dag_runs.get_previous(
+        dag_id="not_exist",
+        logical_date=datetime.now(timezone.utc) + timedelta(seconds=10),
+    )
+
+    console.print(" Non-existent Previous DAG Run Response ".center(72, "="))
+    console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+    console.print(f"[bright_blue]Detail:[/] {response.dag_run}")
+    console.print("=" * 72)
+
+    assert isinstance(response, PreviousDagRunResult)
+    assert response.dag_run is None
+    console.print("[green]✅ Previous DAG run (not found) get test passed!")

Reply via email to