Sounds very useful!

Regarding the llm-powered case: where do the system prompt or custom user
instructions go? The only thing we specified is the connection id, yet the
connection doesn't have a system prompt field (at least according to
https://airflow.apache.org/docs/apache-airflow-providers-common-ai/stable/connections/pydantic_ai.html).
So how do we configure the agent to classify into nonstandard categories
or  behave according to our specifications when certain types of errors are
encountered?

Best,
Dev-iL

On Sat, 18 Apr 2026, 3:02 Kaxil Naik, <[email protected]> wrote:

> Hi all,
>
> Continuing the push to make Airflow AI-native, I have put together AIP-105:
> Pluggable Retry Policies.
>
> Wiki:
>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-105%3A+Pluggable+Retry+Policies
> PR (core): https://github.com/apache/airflow/pull/65450
> PR (LLM-powered, common-ai provider):
> https://github.com/apache/airflow/pull/65451
>
> The problem is straightforward: Airflow retries every failure the same way.
> An expired API key gets retried 3 times over 15 minutes. A rate-limited API
> gets retried immediately, hitting the same 429. Users who want smarter
> retries today have to wrap every task in try/except and raise
> AirflowFailException manually, mixing retry logic into business logic.
>
> This AIP adds a retry_policy parameter to BaseOperator. The policy
> evaluates the actual exception at failure time and returns RETRY (with a
> custom delay), FAIL (skip remaining retries), or DEFAULT (standard
> behaviour). It runs in the worker process, not the scheduler.
>
> Declarative example:
>
> ```python
>     @task(
>         retries=5,
>         retry_policy=ExceptionRetryPolicy(
>         rules=[
>             RetryRule(
>             exception="requests.exceptions.HTTPError",
>                     action=RetryAction.RETRY,
>                     retry_delay=timedelta(minutes=5)
>                 ),
>             RetryRule(
>             exception="google.auth.exceptions.RefreshError",
>                   action=RetryAction.FAIL
>               ),
>         ]
>     ),
>     )
>     def call_api():
>         ...
> ```
>
> LLM-powered example -- uses any pydantic-ai provider (OpenAI, Anthropic,
> Bedrock, Ollama):
>
>     @task(retries=5, retry_policy=(llm_conn_id="my_llm"))
>     def call_flaky_api(): ...
>
> The LLM version classifies errors into categories (auth, rate_limit,
> network, data, transient, permanent) using structured output with a
> 30-second timeout and declarative fallback rules for when the LLM itself is
> down.
>
> I have attached demo videos and screenshots to both PRs showing both
> policies running end-to-end in Airflow -- including the LLM correctly
> classifying 4 different error types via Claude Haiku.
>
> Full design, done criteria, and implementation details are in the wiki page
> above.
>
> Feedback welcome.
>
> Thanks,
> Kaxil
>

Reply via email to