kalluripradeep opened a new pull request, #61336:
URL: https://github.com/apache/airflow/pull/61336
## Description
This PR implements **DAG-level automatic retry functionality** as requested
in #60866, allowing entire DAG runs to be retried automatically when all tasks
complete and at least one task fails.
## Problem Statement
Currently, Airflow only supports task-level retries. When a DAG fails due to
transient infrastructure issues or temporary unavailability of external
dependencies, users must manually clear and re-run the entire DAG. This PR
introduces automatic DAG-level retry to handle such scenarios.
## Solution
Added two new DAG parameters:
- **`max_dag_retries`** (int, default: 0): Maximum number of times to retry
the entire DAG
- **`dag_retry_delay`** (timedelta, optional): Delay between retry attempts
### Example Usage
```python
from airflow.sdk import DAG
from datetime import timedelta
dag = DAG(
"example_dag_retry",
start_date=datetime(2024, 1, 1),
max_dag_retries=2, # Retry up to 2 times
dag_retry_delay=timedelta(minutes=5), # Wait 5 minutes between retries
)
```
## Implementation Details
### 1. **Database Schema Changes**
- Added `dag_try_number` and `dag_max_tries` fields to `dag_run` table
- Added `max_dag_retries` and `dag_retry_delay` fields to `dag` table
- Created Alembic migration: `0101_3_2_0_add_dag_level_retry_fields.py`
### 2. **Retry Logic** (`dagrun.py`)
- Modified `DagRun.update_state()` to check for retry eligibility
- When retry conditions are met:
- Increment `dag_try_number`
- Re-queue DagRun with `QUEUED` state
- Clear only failed task instances (preserve successful ones)
- Apply `dag_retry_delay` if specified
- Callbacks only fire after all retries are exhausted
### 3. **Key Features**
- ✅ Backward compatible (disabled by default with `max_dag_retries=0`)
- ✅ Only clears failed tasks, successful tasks are not re-executed
- ✅ Callbacks suppressed until retries exhausted
- ✅ Retry only triggers when all tasks are complete
- ✅ Configurable retry delay
### 4. **Files Modified**
- `airflow-core/src/airflow/models/dagrun.py` - Core retry logic
- `airflow-core/src/airflow/models/dag.py` - DagModel fields
- `task-sdk/src/airflow/sdk/definitions/dag.py` - SDK DAG parameters
-
`airflow-core/src/airflow/migrations/versions/0101_3_2_0_add_dag_level_retry_fields.py`
- Migration
- `airflow-core/tests/unit/models/test_dagrun.py` - Added 10 comprehensive
tests
- `airflow-core/docs/core-concepts/dag-run.rst` - Complete documentation
## Testing
Added 10 comprehensive unit tests covering:
- ✅ Basic retry functionality
- ✅ Retry with delay
- ✅ Max retries exhausted
- ✅ Disabled by default
- ✅ Partial failure (tasks still running)
- ✅ Success case (no retry)
- ✅ Selective task clearing
- ✅ Field initialization
- ✅ Callback behavior
- ✅ Edge cases
## Documentation
Added comprehensive documentation in `core-concepts/dag-run.rst` including:
- Feature overview and use cases
- Configuration parameters
- How it works (step-by-step)
- Difference from task-level retry
- Example scenarios
## Breaking Changes
None. Feature is disabled by default (`max_dag_retries=0`).
## Checklist
- [x] Database schema changes with Alembic migration
- [x] Unit tests added (10 tests)
- [x] Documentation updated
- [x] Backward compatible
- [x] Follows existing Airflow patterns
## Related Issue
Fixes #60866
## Screenshots
N/A (Backend feature)
---
**Ready for review!** 🚀
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]