Sounds very useful! Regarding the llm-powered case: where do the system prompt or custom user instructions go? The only thing we specified is the connection id, yet the connection doesn't have a system prompt field (at least according to https://airflow.apache.org/docs/apache-airflow-providers-common-ai/stable/connections/pydantic_ai.html). So how do we configure the agent to classify into nonstandard categories or behave according to our specifications when certain types of errors are encountered?
Best, Dev-iL On Sat, 18 Apr 2026, 3:02 Kaxil Naik, <[email protected]> wrote: > Hi all, > > Continuing the push to make Airflow AI-native, I have put together AIP-105: > Pluggable Retry Policies. > > Wiki: > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-105%3A+Pluggable+Retry+Policies > PR (core): https://github.com/apache/airflow/pull/65450 > PR (LLM-powered, common-ai provider): > https://github.com/apache/airflow/pull/65451 > > The problem is straightforward: Airflow retries every failure the same way. > An expired API key gets retried 3 times over 15 minutes. A rate-limited API > gets retried immediately, hitting the same 429. Users who want smarter > retries today have to wrap every task in try/except and raise > AirflowFailException manually, mixing retry logic into business logic. > > This AIP adds a retry_policy parameter to BaseOperator. The policy > evaluates the actual exception at failure time and returns RETRY (with a > custom delay), FAIL (skip remaining retries), or DEFAULT (standard > behaviour). It runs in the worker process, not the scheduler. > > Declarative example: > > ```python > @task( > retries=5, > retry_policy=ExceptionRetryPolicy( > rules=[ > RetryRule( > exception="requests.exceptions.HTTPError", > action=RetryAction.RETRY, > retry_delay=timedelta(minutes=5) > ), > RetryRule( > exception="google.auth.exceptions.RefreshError", > action=RetryAction.FAIL > ), > ] > ), > ) > def call_api(): > ... > ``` > > LLM-powered example -- uses any pydantic-ai provider (OpenAI, Anthropic, > Bedrock, Ollama): > > @task(retries=5, retry_policy=(llm_conn_id="my_llm")) > def call_flaky_api(): ... > > The LLM version classifies errors into categories (auth, rate_limit, > network, data, transient, permanent) using structured output with a > 30-second timeout and declarative fallback rules for when the LLM itself is > down. > > I have attached demo videos and screenshots to both PRs showing both > policies running end-to-end in Airflow -- including the LLM correctly > classifying 4 different error types via Claude Haiku. > > Full design, done criteria, and implementation details are in the wiki page > above. > > Feedback welcome. > > Thanks, > Kaxil >
