amoghrajesh opened a new issue, #58178:
URL: https://github.com/apache/airflow/issues/58178
### Body
This guide explains how to implement the stub integration tests for the Task
SDK Execution API endpoints.
## Overview
We have placeholder test functions (stubs) with `@pytest.mark.skip`
decorators that need to be implemented. Each stub includes:
- Docstring describing what to test
- Expected response type
- Endpoint information
- `NotImplementedError` placeholder for the time being
The task is to implement these tests following the patterns established in
existing tests.
**Note**: For realistic examples of how these API calls work in practice,
see the actual implementation in `task-sdk/src/airflow/sdk/api/client.py`.
This file contains the `Client` class and operation classes (e.g.,
`TaskInstanceOperations`, `XComOperations`) that show how the SDK makes these
calls,
what parameters they accept, and what they return.
## Getting Started
### Step 1: Choose a Test File
Pick one test file to work on:
- [ ] `test_xcom_operations.py` - XCom operations
- [ ] `test_variable_operations.py` - Variable operations
- [ ] `test_connection_operations.py` - Connection operations
- [ ] `test_dag_run_operations.py` - DAG Run operations
- [ ] `test_hitl_operations.py` - HITL operations
**File-wise assignment is recommended** to avoid merge conflicts when
multiple contributors work in parallel.
You may comment which file you plan to work on in the relevant GitHub issue
and it will be assigned to you.
### Step 2: Understand the Pattern
Study existing implemented tests in the same file or similar files:
- `test_task_instance_operations.py` - Examples:
`test_ti_get_previous_successful_dagrun`, `test_ti_set_rtif`,
`test_ti_heartbeat`
- `test_xcom_operations.py` - Examples: `test_get_xcom`, `test_set_xcom`,
`test_xcom_delete`
- `test_variable_operations.py` - Examples: `test_variable_get`,
`test_variable_get_not_found`
### Step 3: Review the Stub
Each stub includes:
- `@pytest.mark.skip(reason="TODO: ...")` decorator
- Docstring with:
- Description of what to test
- Expected response type
- Endpoint path
- `console.print("[yellow]TODO: ...")` placeholder
- `raise NotImplementedError(...)` placeholder
### Step 4: Implement the Test
1. **Remove the skip decorator**: Delete `@pytest.mark.skip(reason="...")`
2. **Remove the NotImplementedError**: Delete `raise
NotImplementedError(...)`
3. **Update the console.print**: Change from `[yellow]TODO: ...` to
`[yellow]Actual test description...`
4. **Implement the test logic**: Follow the pattern from existing tests
5. **Add assertions**: Verify response type, values, and behavior
## Test Implementation Patterns
### Pattern 1: Simple GET Operation
```python
def test_resource_get(sdk_client, dag_info):
"""Test getting a resource."""
console.print("[yellow]Getting resource...")
response = sdk_client.resources.get("resource_id")
console.print(" Resource Get Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/]
{type(response).__name__}")
console.print(f"[bright_blue]Key Field:[/] {response.key_field}")
console.print("=" * 72)
assert isinstance(response, ResourceResponse)
assert response.key_field == "expected_value"
console.print("[green]✅ Resource get test passed!")
```
### Pattern 2: GET Operation for negative test
```python
def test_resource_get_not_found(sdk_client):
"""Test getting a non-existent resource."""
console.print("[yellow]Getting non-existent resource...")
response = sdk_client.resources.get("non_existent_id")
console.print(" Resource Get (Not Found) Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/]
{type(response).__name__}")
console.print(f"[bright_blue]Error Type:[/] {response.error}")
console.print(f"[bright_blue]Detail:[/] {response.detail}")
console.print("=" * 72)
assert isinstance(response, ErrorResponse)
assert str(response.error).endswith("RESOURCE_NOT_FOUND")
console.print("[green]✅ Resource get (not found) test passed!")
```
### Pattern 3: SET/POST Operation
```python
def test_resource_set(sdk_client):
"""Test setting a resource value."""
console.print("[yellow]Setting resource value...")
test_key = "test_resource_key"
test_value = "test_value"
response = sdk_client.resources.set(test_key, test_value)
console.print(" Resource Set Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/]
{type(response).__name__}")
console.print(f"[bright_blue]Status:[/] {response.ok}")
console.print("=" * 72)
assert isinstance(response, OKResponse)
assert response.ok is True
# Verify by getting it back
get_response = sdk_client.resources.get(test_key)
assert get_response.value == test_value
console.print("[green]✅ Resource set test passed!")
```
### Pattern 4: DELETE Operation
```python
def test_resource_delete(sdk_client):
"""Test deleting a resource."""
console.print("[yellow]Deleting resource...")
test_key = "test_resource_delete_key"
# First set it
sdk_client.resources.set(test_key, "to_be_deleted")
# Then delete it
response = sdk_client.resources.delete(test_key)
console.print(" Resource Delete Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/]
{type(response).__name__}")
console.print(f"[bright_blue]Status:[/] {response.ok}")
console.print("=" * 72)
assert isinstance(response, OKResponse)
assert response.ok is True
# Verify it's deleted
get_response = sdk_client.resources.get(test_key)
assert isinstance(get_response, ErrorResponse)
console.print("[green]✅ Resource delete test passed!")
```
### Pattern 5: State Transition Operations
```python
def test_ti_succeed(sdk_client, task_instance_id):
"""Test marking a task instance as succeeded."""
console.print("[yellow]Marking task instance as succeeded...")
from airflow.sdk.timezone import utcnow
from airflow.sdk.api.datamodels._generated import TaskOutlets
# Prepare outlets (empty for this example)
task_outlets = TaskOutlets(assets=[], asset_events=[])
outlet_events = []
sdk_client.task_instances.succeed(
id=task_instance_id,
when=utcnow(),
task_outlets=task_outlets,
outlet_events=outlet_events,
rendered_map_index="-1", # Use "-1" for non-mapped tasks
)
console.print(" Task Instance Succeeded ".center(72, "="))
console.print("[bright_blue]Status:[/] Success (204 No Content)")
console.print(f"[bright_blue]Task Instance ID:[/] {task_instance_id}")
console.print("=" * 72)
console.print("[green]✅ Task instance succeed test passed!")
```
## Available Fixtures
### `sdk_client`
Authenticated Task SDK client for general operations. Authenticated with a
particular task instance.
```python
def test_example(sdk_client, dag_info):
response = sdk_client.connections.get("test_connection")
```
### `dag_info`
Dictionary with DAG information:
- `dag_id`: DAG ID (e.g., "test_dag")
- `dag_run_id`: DAG run ID
- `logical_date`: Logical date string
### `task_instance_id`
UUID string of a running task instance from `test_dag`.
### `sdk_client_for_assets`
SDK client specifically for asset-related tests (use with
`asset_test_setup`).
### `asset_test_setup`
Dictionary with asset information:
- `name`: Asset name
- `uri`: Asset URI
- `dag_run_id`: DAG run ID that produced the asset
## Test Requirements
### Positive Tests
- Test with valid inputs
- Assert correct response type
- Assert expected values in response
- Use descriptive console output for debugging
- Include success message before exit: `console.print("[green]✅ Test name
passed!")`
### Negative Tests
- Test error scenarios (not found, invalid input, conflicts)
- Assert `ErrorResponse` type
- Assert correct error code (e.g., `CONNECTION_NOT_FOUND`,
`DAGRUN_ALREADY_EXISTS`)
- Verify error message/detail is present
## Testing Your Code
### Run Tests Locally
```bash
cd task-sdk-tests
uv run pytest tests/task_sdk_tests/test_<your_file>.py::test_<specific_test>
-v
```
### Run All Tests in a File
```bash
uv run pytest tests/task_sdk_tests/test_<your_file>.py -v
```
### Run with Verbose Output
```bash
uv run pytest tests/task_sdk_tests/test_<your_file>.py -xvs
```
## Example: Complete Implementation
Here's a complete example of implementing `test_connection_get_not_found`:
**Before (stub):**
```python
@pytest.mark.skip(reason="TODO: Implement Connection get (not found) test")
def test_connection_get_not_found(sdk_client):
"""
Test getting a non-existent connection.
Expected: ErrorResponse with CONNECTION_NOT_FOUND error
Endpoint: GET /execution/connections/{conn_id}
"""
console.print("[yellow]TODO: Implement test_connection_get_not_found")
raise NotImplementedError("test_connection_get_not_found not
implemented")
```
**After (implemented):**
```python
def test_connection_get_not_found(sdk_client):
"""
Test getting a non-existent connection.
Expected: ErrorResponse with CONNECTION_NOT_FOUND error
Endpoint: GET /execution/connections/{conn_id}
"""
console.print("[yellow]Getting non-existent connection...")
response = sdk_client.connections.get("non_existent_connection")
console.print(" Connection Get (Not Found) Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/]
{type(response).__name__}")
console.print(f"[bright_blue]Error Type:[/] {response.error}")
console.print(f"[bright_blue]Detail:[/] {response.detail}")
console.print("=" * 72)
assert isinstance(response, ErrorResponse)
assert str(response.error).endswith("CONNECTION_NOT_FOUND")
console.print("[green]✅ Connection get (not found) test passed!")
```
## Troubleshooting
### Test Fails with 404 Not Found
- Verify the resource exists (for positive tests)
- Check that you're using the correct IDs/keys
- Ensure the DAG run is in the correct state
### Test Fails with Authentication Error
- Ensure you're using the `sdk_client` fixture (not creating your own)
- Check that the JWT token is valid
### Import Errors
- Check that you're importing from the correct modules:
- `airflow.sdk.api.datamodels._generated` for response types
- `airflow.sdk.execution_time.comms` for `OKResponse`, `ErrorResponse`,
etc.
## Submitting Your Changes
1.**Ensure all tests pass**: Run the full test suite for your file
2**Reference the issue**: Include issue number in your PR description (as
related #[issue-number])
3**Keep changes focused**: One file per PR is recommended
## Questions?
- Check existing test files for patterns
- Review the SDK client code: `task-sdk/src/airflow/sdk/api/client.py`
- Review the API routes:
`airflow-core/src/airflow/api_fastapi/execution_api/routes/`
- Ask in the issue comments or PR discussion
- Reach out on the SIG: #sig-airflow-task-sdk-integration-tests channel on
Airflow Slack
## Note on testing Environment
Tests run against a Docker Compose environment that includes:
- PostgreSQL database
- Airflow API server
- Airflow scheduler
- Airflow DAG processor
- Airflow worker
The test fixtures handle:
- Starting Docker Compose
- Waiting for services to be ready
- Creating test DAG runs
- Authenticating requests
- Cleaning up after tests
You don't need to manage the Docker environment manually - the fixtures
handle everything!
### Committer
- [x] I acknowledge that I am a maintainer/committer of the Apache Airflow
project.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]