amoghrajesh opened a new issue, #58178:
URL: https://github.com/apache/airflow/issues/58178

   ### Body
   
   This guide explains how to implement the stub integration tests for the Task 
SDK Execution API endpoints.
   
   ## Overview
   
   We have placeholder test functions (stubs) with `@pytest.mark.skip` 
decorators that need to be implemented. Each stub includes:
   - Docstring describing what to test
   - Expected response type
   - Endpoint information
   - `NotImplementedError` placeholder for the time being
   
   The task is to implement these tests following the patterns established in 
existing tests.
   
   **Note**: For realistic examples of how these API calls work in practice, 
see the actual implementation in `task-sdk/src/airflow/sdk/api/client.py`.
   This file contains the `Client` class and operation classes (e.g., 
`TaskInstanceOperations`, `XComOperations`) that show how the SDK makes these 
calls,
   what parameters they accept, and what they return.
   
   ## Getting Started
   
   ### Step 1: Choose a Test File
   
   Pick one test file to work on:
   - [ ] `test_xcom_operations.py` - XCom operations
   - [ ] `test_variable_operations.py` - Variable operations
   - [ ] `test_connection_operations.py` - Connection operations
   - [ ]  `test_dag_run_operations.py` - DAG Run operations
   - [ ] `test_hitl_operations.py` - HITL operations
   
   **File-wise assignment is recommended** to avoid merge conflicts when 
multiple contributors work in parallel.
   You may comment which file you plan to work on in the relevant GitHub issue 
and it will be assigned to you.
   
   ### Step 2: Understand the Pattern
   
   Study existing implemented tests in the same file or similar files:
   - `test_task_instance_operations.py` - Examples: 
`test_ti_get_previous_successful_dagrun`, `test_ti_set_rtif`, 
`test_ti_heartbeat`
   - `test_xcom_operations.py` - Examples: `test_get_xcom`, `test_set_xcom`, 
`test_xcom_delete`
   - `test_variable_operations.py` - Examples: `test_variable_get`, 
`test_variable_get_not_found`
   
   ### Step 3: Review the Stub
   
   Each stub includes:
   - `@pytest.mark.skip(reason="TODO: ...")` decorator
   - Docstring with:
     - Description of what to test
     - Expected response type
     - Endpoint path
   - `console.print("[yellow]TODO: ...")` placeholder
   - `raise NotImplementedError(...)` placeholder
   
   ### Step 4: Implement the Test
   
   1. **Remove the skip decorator**: Delete `@pytest.mark.skip(reason="...")`
   2. **Remove the NotImplementedError**: Delete `raise 
NotImplementedError(...)`
   3. **Update the console.print**: Change from `[yellow]TODO: ...` to 
`[yellow]Actual test description...`
   4. **Implement the test logic**: Follow the pattern from existing tests
   5. **Add assertions**: Verify response type, values, and behavior
   
   ## Test Implementation Patterns
   
   ### Pattern 1: Simple GET Operation
   
   ```python
   def test_resource_get(sdk_client, dag_info):
       """Test getting a resource."""
       console.print("[yellow]Getting resource...")
   
       response = sdk_client.resources.get("resource_id")
   
       console.print(" Resource Get Response ".center(72, "="))
       console.print(f"[bright_blue]Response Type:[/] 
{type(response).__name__}")
       console.print(f"[bright_blue]Key Field:[/] {response.key_field}")
       console.print("=" * 72)
   
       assert isinstance(response, ResourceResponse)
       assert response.key_field == "expected_value"
       console.print("[green]✅ Resource get test passed!")
   ```
   
   ### Pattern 2: GET Operation for negative test
   
   ```python
   def test_resource_get_not_found(sdk_client):
       """Test getting a non-existent resource."""
       console.print("[yellow]Getting non-existent resource...")
   
       response = sdk_client.resources.get("non_existent_id")
   
       console.print(" Resource Get (Not Found) Response ".center(72, "="))
       console.print(f"[bright_blue]Response Type:[/] 
{type(response).__name__}")
       console.print(f"[bright_blue]Error Type:[/] {response.error}")
       console.print(f"[bright_blue]Detail:[/] {response.detail}")
       console.print("=" * 72)
   
       assert isinstance(response, ErrorResponse)
       assert str(response.error).endswith("RESOURCE_NOT_FOUND")
       console.print("[green]✅ Resource get (not found) test passed!")
   ```
   
   ### Pattern 3: SET/POST Operation
   
   ```python
   def test_resource_set(sdk_client):
       """Test setting a resource value."""
       console.print("[yellow]Setting resource value...")
   
       test_key = "test_resource_key"
       test_value = "test_value"
   
       response = sdk_client.resources.set(test_key, test_value)
   
       console.print(" Resource Set Response ".center(72, "="))
       console.print(f"[bright_blue]Response Type:[/] 
{type(response).__name__}")
       console.print(f"[bright_blue]Status:[/] {response.ok}")
       console.print("=" * 72)
   
       assert isinstance(response, OKResponse)
       assert response.ok is True
   
       # Verify by getting it back
       get_response = sdk_client.resources.get(test_key)
       assert get_response.value == test_value
       console.print("[green]✅ Resource set test passed!")
   ```
   
   ### Pattern 4: DELETE Operation
   
   ```python
   def test_resource_delete(sdk_client):
       """Test deleting a resource."""
       console.print("[yellow]Deleting resource...")
   
       test_key = "test_resource_delete_key"
   
       # First set it
       sdk_client.resources.set(test_key, "to_be_deleted")
   
       # Then delete it
       response = sdk_client.resources.delete(test_key)
   
       console.print(" Resource Delete Response ".center(72, "="))
       console.print(f"[bright_blue]Response Type:[/] 
{type(response).__name__}")
       console.print(f"[bright_blue]Status:[/] {response.ok}")
       console.print("=" * 72)
   
       assert isinstance(response, OKResponse)
       assert response.ok is True
   
       # Verify it's deleted
       get_response = sdk_client.resources.get(test_key)
       assert isinstance(get_response, ErrorResponse)
       console.print("[green]✅ Resource delete test passed!")
   ```
   
   ### Pattern 5: State Transition Operations
   
   ```python
   def test_ti_succeed(sdk_client, task_instance_id):
       """Test marking a task instance as succeeded."""
       console.print("[yellow]Marking task instance as succeeded...")
   
       from airflow.sdk.timezone import utcnow
       from airflow.sdk.api.datamodels._generated import TaskOutlets
   
       # Prepare outlets (empty for this example)
       task_outlets = TaskOutlets(assets=[], asset_events=[])
       outlet_events = []
   
       sdk_client.task_instances.succeed(
           id=task_instance_id,
           when=utcnow(),
           task_outlets=task_outlets,
           outlet_events=outlet_events,
           rendered_map_index="-1",  # Use "-1" for non-mapped tasks
       )
   
       console.print(" Task Instance Succeeded ".center(72, "="))
       console.print("[bright_blue]Status:[/] Success (204 No Content)")
       console.print(f"[bright_blue]Task Instance ID:[/] {task_instance_id}")
       console.print("=" * 72)
       console.print("[green]✅ Task instance succeed test passed!")
   ```
   
   ## Available Fixtures
   
   ### `sdk_client`
   Authenticated Task SDK client for general operations. Authenticated with a 
particular task instance.
   ```python
   def test_example(sdk_client, dag_info):
       response = sdk_client.connections.get("test_connection")
   ```
   
   ### `dag_info`
   Dictionary with DAG information:
   - `dag_id`: DAG ID (e.g., "test_dag")
   - `dag_run_id`: DAG run ID
   - `logical_date`: Logical date string
   
   ### `task_instance_id`
   UUID string of a running task instance from `test_dag`.
   
   ### `sdk_client_for_assets`
   SDK client specifically for asset-related tests (use with 
`asset_test_setup`).
   
   ### `asset_test_setup`
   Dictionary with asset information:
   - `name`: Asset name
   - `uri`: Asset URI
   - `dag_run_id`: DAG run ID that produced the asset
   
   ## Test Requirements
   
   ### Positive Tests
   - Test with valid inputs
   - Assert correct response type
   - Assert expected values in response
   - Use descriptive console output for debugging
   - Include success message before exit: `console.print("[green]✅ Test name 
passed!")`
   
   ### Negative Tests
   - Test error scenarios (not found, invalid input, conflicts)
   - Assert `ErrorResponse` type
   - Assert correct error code (e.g., `CONNECTION_NOT_FOUND`, 
`DAGRUN_ALREADY_EXISTS`)
   - Verify error message/detail is present
   
   ## Testing Your Code
   
   ### Run Tests Locally
   
   ```bash
   cd task-sdk-tests
   uv run pytest tests/task_sdk_tests/test_<your_file>.py::test_<specific_test> 
-v
   ```
   
   ### Run All Tests in a File
   
   ```bash
   uv run pytest tests/task_sdk_tests/test_<your_file>.py -v
   ```
   
   ### Run with Verbose Output
   
   ```bash
   uv run pytest tests/task_sdk_tests/test_<your_file>.py -xvs
   ```
   
   ## Example: Complete Implementation
   
   Here's a complete example of implementing `test_connection_get_not_found`:
   
   **Before (stub):**
   ```python
   @pytest.mark.skip(reason="TODO: Implement Connection get (not found) test")
   def test_connection_get_not_found(sdk_client):
       """
       Test getting a non-existent connection.
   
       Expected: ErrorResponse with CONNECTION_NOT_FOUND error
       Endpoint: GET /execution/connections/{conn_id}
       """
       console.print("[yellow]TODO: Implement test_connection_get_not_found")
       raise NotImplementedError("test_connection_get_not_found not 
implemented")
   ```
   
   **After (implemented):**
   ```python
   def test_connection_get_not_found(sdk_client):
       """
       Test getting a non-existent connection.
   
       Expected: ErrorResponse with CONNECTION_NOT_FOUND error
       Endpoint: GET /execution/connections/{conn_id}
       """
       console.print("[yellow]Getting non-existent connection...")
   
       response = sdk_client.connections.get("non_existent_connection")
   
       console.print(" Connection Get (Not Found) Response ".center(72, "="))
       console.print(f"[bright_blue]Response Type:[/] 
{type(response).__name__}")
       console.print(f"[bright_blue]Error Type:[/] {response.error}")
       console.print(f"[bright_blue]Detail:[/] {response.detail}")
       console.print("=" * 72)
   
       assert isinstance(response, ErrorResponse)
       assert str(response.error).endswith("CONNECTION_NOT_FOUND")
       console.print("[green]✅ Connection get (not found) test passed!")
   ```
   
   ## Troubleshooting
   
   ### Test Fails with 404 Not Found
   - Verify the resource exists (for positive tests)
   - Check that you're using the correct IDs/keys
   - Ensure the DAG run is in the correct state
   
   ### Test Fails with Authentication Error
   - Ensure you're using the `sdk_client` fixture (not creating your own)
   - Check that the JWT token is valid
   
   ### Import Errors
   - Check that you're importing from the correct modules:
     - `airflow.sdk.api.datamodels._generated` for response types
     - `airflow.sdk.execution_time.comms` for `OKResponse`, `ErrorResponse`, 
etc.
   
   ## Submitting Your Changes
   
   1.**Ensure all tests pass**: Run the full test suite for your file
   2**Reference the issue**: Include issue number in your PR description (as 
related #[issue-number])
   3**Keep changes focused**: One file per PR is recommended
   
   ## Questions?
   
   - Check existing test files for patterns
   - Review the SDK client code: `task-sdk/src/airflow/sdk/api/client.py`
   - Review the API routes: 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/`
   - Ask in the issue comments or PR discussion
   - Reach out on the SIG: #sig-airflow-task-sdk-integration-tests channel on 
Airflow Slack
   
   ## Note on testing Environment
   
   Tests run against a Docker Compose environment that includes:
   - PostgreSQL database
   - Airflow API server
   - Airflow scheduler
   - Airflow DAG processor
   - Airflow worker
   
   The test fixtures handle:
   - Starting Docker Compose
   - Waiting for services to be ready
   - Creating test DAG runs
   - Authenticating requests
   - Cleaning up after tests
   
   You don't need to manage the Docker environment manually - the fixtures 
handle everything!
   
   
   
   ### Committer
   
   - [x] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to