Hi all, Continuing the push to make Airflow AI-native, I have put together AIP-105: Pluggable Retry Policies.
Wiki: https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-105%3A+Pluggable+Retry+Policies PR (core): https://github.com/apache/airflow/pull/65450 PR (LLM-powered, common-ai provider): https://github.com/apache/airflow/pull/65451 The problem is straightforward: Airflow retries every failure the same way. An expired API key gets retried 3 times over 15 minutes. A rate-limited API gets retried immediately, hitting the same 429. Users who want smarter retries today have to wrap every task in try/except and raise AirflowFailException manually, mixing retry logic into business logic. This AIP adds a retry_policy parameter to BaseOperator. The policy evaluates the actual exception at failure time and returns RETRY (with a custom delay), FAIL (skip remaining retries), or DEFAULT (standard behaviour). It runs in the worker process, not the scheduler. Declarative example: ```python @task( retries=5, retry_policy=ExceptionRetryPolicy( rules=[ RetryRule( exception="requests.exceptions.HTTPError", action=RetryAction.RETRY, retry_delay=timedelta(minutes=5) ), RetryRule( exception="google.auth.exceptions.RefreshError", action=RetryAction.FAIL ), ] ), ) def call_api(): ... ``` LLM-powered example -- uses any pydantic-ai provider (OpenAI, Anthropic, Bedrock, Ollama): @task(retries=5, retry_policy=(llm_conn_id="my_llm")) def call_flaky_api(): ... The LLM version classifies errors into categories (auth, rate_limit, network, data, transient, permanent) using structured output with a 30-second timeout and declarative fallback rules for when the LLM itself is down. I have attached demo videos and screenshots to both PRs showing both policies running end-to-end in Airflow -- including the LLM correctly classifying 4 different error types via Claude Haiku. Full design, done criteria, and implementation details are in the wiki page above. Feedback welcome. Thanks, Kaxil
