Thanks Kaxil, Jarek, and Jens.

I agree this needs an AIP, and after looking into how much work Kaxil has 
already put into AIP-103, I agree it should build on top of that rather than 
ship a parallel subsystem.  I honestly lost track of that AIP and didn't 
realize how much overlap there was and how close it is to completion; it looks 
like it might be done for 3.3.0?  Looking at the implementation that's landed 
on main over the past few weeks, the BaseTaskStateBackend / StateScope / 
retention infrastructure all looks like it's reusable here.  No sense 
duplicating that.

Unless Kaxil want to claim first rights on this since he already alluded to it 
in his AIP, I'd like to carry this forward as a follow-up AIP.  My plan:

- Write a new AIP which builds on the merged 103 interfaces, proposing a 
CacheScope (or similar) that extends StateScope and reuses the existing 
backend/GC/UI plumbing.
- I'll put more thought into, and address, the open questions Kaxil raised: 
source hash definition, observability on cache hits, XCom backend interaction, 
mapped tasks, deferred tasks, scheduler latency, and manual invalidation as a 
P0 requirement and address them in the AIP.
- Drop the cache=True sugar from the initial scope.  The first version would be 
the storage + lookup + scheduler integration layer; the convenience decorator 
can come later once we've learned from real usage (per Jarek's point about 
needing the lower-level primitive first).

Unless anyone objects or would prefer to take this in a different direction, I 
can start drafting the AIP and bring it back to the list when it's ready for 
discussion.

  - ferruzzi
________________________________
From: Jens Scheffler <[email protected]>
Sent: Saturday, May 23, 2026 4:09 AM
To: [email protected] <[email protected]>
Subject: RE: [EXT] [DISCUSS] Task Result Caching

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. Ne 
cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez pas 
confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que le 
contenu ne présente aucun risque.



+1 on the idea and +1 that we have it as a core feature. With all the
details listed below and reasonable things to consider, AIP is needed.
But we have many users who would benefir and... as long as opt-in I
think it can also start simpler and then grow over time.

Jens

P.S.: Not sure why but the AWS emails always land in my SPAM filter,
both private as well as at-work. Not sure what is wrong with AWS email
setup but it seems certainly the same level of computer science problems
like cache implementations.

On 23.05.26 01:56, Jarek Potiuk wrote:
> * Certainly need an AIP
> * There are two difficult things in Computer Science... You know the drill
> (cache invalidation). And yes - caching is hard. Very. Very. Very.
> Difficult. Especially in a distributed context, with backfillable Dags,
> versioned dags, storing task state etc.
> * I would even be stronger and push back on even thinking and discussing
> about caching before task state is done available and used.
>
> As Kaxil mentioned, there are many common properties between caching and
> task state—but Task State addresses them much more explicitly. and it's
> lower level. The "magical" caching is deceptively simple. With task state,
> Operator author or task flow task author has to deliberately think on how
> task state should be used - avoiding certain traps, where "cache=True"
> means that we will have to have all those cases solved for users who will
> expect that everything will work as they think and all the edge cases will
> be solved for them. I guess we will only start learning when Task State
> (which is a lower-level primitive) starts being used. With such lower-level
> solutions, edge cases can be solved temporarily by "if x - do it
> differently". When we have "cache=True," there are no workarounds.
>
> So my proposal is "yes. But let's build it on AIP-103, and only after we
> learn some edge-cases.
>
> J.
>
>
> On Fri, May 22, 2026 at 11:47 PM Kaxil Naik <[email protected]> wrote:
>
>> Hi Dennis,
>>
>> Thanks for writing this up -- result caching is something I have been
>> thinking about as a following to AIP-103 (and hence listed as follow-up AIP
>> in AIP-103 doc
>> <
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
>>> )
>> in for the same: ML and reproducible-pipeline workloads. A few thoughts on
>> shape, scope, and how this fits with what's already in flight.
>>
>> *Yes, this needs an AIP*
>>
>> I'd push back on "this doesn't require an AIP". The proposal touches many
>> critical paths, especially the scheduler's execution path (skipping
>> queue/executor on a cache hit).
>>
>> That's a public-surface, cross-cutting change with behaviour that's hard to
>> walk back once shipped. The AIP doesn't have to be heavy -- but a written
>> design that the community can vote on (and that ties down the edge cases
>> below) will save us from "we shipped it experimental and now four people
>> interpret cache semantics differently". The discussion thread alone won't
>> capture the decisions we need to make.
>>
>> *Relationship to AIP-103 (Task State Management)*
>>
>> This is the part I think needs the most thought before you start
>> implementing.
>>
>> AIP-103 is already landing a pluggable, scoped, JWT-authorized state
>> backend with:
>>
>>
>>     - A `BaseTaskStateBackend` interface with sync + async
>>     get/set/delete/clear
>>     - A `StateScope` abstraction (`TaskScope`, `AssetScope` today)
>>     - Retention/GC policy at deployment + operator level
>>     - A UI panel for inspecting/clearing entries
>>     - Execution API endpoints under `/state/...`
>>
>>
>> Result caching needs essentially the same infrastructure: a pluggable
>> backend, retention (TTL is just a different retention policy), UI for
>> inspecting/clearing entries, multi-team isolation, garbage collection. If
>> we ship a parallel `BaseTaskCacheBackend` with its own UI, its own config
>> section, its own retention story, we end up with two near-identical
>> subsystems that diverge over time.
>>
>> My strong preference: extend AIP-103's scope model with a `CacheScope` (or
>> similar) keyed by `(team, dag_id, task_id, source_hash, input_hash)`, and
>> reuse the backend abstraction. Result caching then becomes "AIP-103 + a
>> scheduler-side lookup + a TTL semantics layer". Hence, caching was already
>> listed as a follow-up future AIP in AIP-103 doc
>> <
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
>> .
>>
>> *Specific concerns worth pinning down in the AIP*
>>
>> A few things I'd want explicitly answered before voting:
>>
>>     - *Observability on cache hits*. If a TI is marked SUCCESS without a
>>     worker run, what does the UI show? `start_date` / `end_date` /
>> `duration`?
>>     Logs? Does `dag.task.duration` stat get emitted? "Skipped because
>> cached"
>>     needs to be visually distinct from a real success -- otherwise debugging
>>     "why is my pipeline producing stale data" gets very hard. I'd argue for
>> a
>>     new TI state (`CACHED` or a sub-state) rather than overloading SUCCESS.
>>     - *Task source hash definition*. You say "computed from the serialized
>>     DAG JSON". The serialized DAG includes `retries`, `pool`, `queue`,
>>     `default_args`, owner, etc. -- none of which affect output. If a user
>> bumps
>>     `retries` from 2 to 3, the cache invalidates. The hash needs a narrower
>>     definition: the callable's bytecode/source + `op_args` + `op_kwargs`
>> only.
>>     Worth specifying exactly what's in the hash.
>>     - *XCom backend interaction.* If the XCom backend is S3 and we're
>>     caching the XCom value, a cache hit pushes the *same* pointer to the new
>>     TI's XCom row. Is that pointer still alive? Do we re-validate on hit?
>> Do we
>>     copy the underlying object? This matters when the XCom backend has its
>> own
>>     retention separate from the cache TTL.
>>     - *Mapped tasks.* Is the cache key per `map_index`? What happens on
>>     partial hits (3 of 10 mapped tasks hit cache, 7 miss)? Does the executor
>>     still get N slots reserved? This needs to be in the design, not "we'll
>>     figure it out".
>>     - *Deferred / async tasks.* Cache the final result, or skip the deferral
>>     entirely? What about sensors a user has explicitly opted into caching --
>>     that's a foot-gun and I'd consider blocking it outright with a `cache`
>>     param on `BaseSensorOperator` that raises if set to `True`, rather than
>>     relying on "free" inheritance.
>>     - *Determinism assumption.* This is opt-in, so technically user-owned.
>>     But a `@task` calling `datetime.utcnow()` or hitting a non-idempotent
>> API
>>     will silently produce wrong results across runs once cached. Worth a
>>     prominent doc warning and maybe a static-analysis lint in `airflow dags
>>     reserialize` that flags obviously-non-deterministic callables.
>>     - *Scheduler critical-path latency.* 500ms fail-open timeout * N
>>     cache-eligible tasks per scheduler loop = a noticeable scheduler stall
>> if
>>     the backend degrades. The lookup needs to be async/batched, and we
>> should
>>     be explicit about what happens to scheduling throughput when the cache
>>     backend is slow but not failing. Otherwise the "we avoid 100ms
>> subprocess
>>     overhead" win gets eaten by lookup overhead in the steady state. And can
>>     cause worse effect that we want to avoid -- especially if we have a
>>     pluggable backend that runs on scheduler -- we end up running user code
>>     there.
>>     - *Manual invalidation.* "I'm not sure on exactly how that would work"
>>     -- this is a hard requirement, not a follow-up. Operators will need to
>>     clear the cache for a task/dag after a bug fix, schema change, or
>> upstream
>>     data correction. Needs a CLI command (`airflow tasks clear-cache ...`)
>> and
>>     a UI button, both authz'd. Without it, a stale cached result is
>>     unrecoverable except by waiting for TTL or changing the source.
>>
>>
>> re prior art: Flyte's design is a reasonable reference but their execution
>> model is materially different -- their tasks are containers with declared
>> inputs/outputs afaik, so source-hash and input-hash are first-class. We
>> have neither, which is why source-hash definition (above) matters so much.
>>
>> The other thing I have been playing around is caching parts of Task
>> Execution and not the entire task results. Not certain yet but playing
>> around.
>>
>> Cheers,
>> Kaxil
>>
>> On Fri, 22 May 2026 at 18:14, Ferruzzi, Dennis <[email protected]>
>> wrote:
>>
>>> Hi all,
>>>
>>> I've heard a few people discussing the idea and with the
>>> Deadlines/Callbacks/Workloads project wrapping up, I'd like to work on
>> Task
>>> Result Caching as my next project.  I'm looking for feedback before I
>> start
>>> implementation.
>>>
>>> The short version: for ML and other repetitive workloads, tasks often run
>>> with identical inputs and produce identical outputs across Dag runs.
>>> Currently we spawn a subprocess every time regardless.  I'd like to add
>> an
>>> optional, per-task caching mechanism that lets the scheduler
>> short-circuit
>>> execution when a valid cached result already exists.  No subprocess
>>> spawned, no executor slot consumed.
>>>
>>> I don't feel like this requires an AIP, but I'm happy to write one if the
>>> community feels otherwise.
>>>
>>> THE BASICS
>>>
>>> Users opt in per-task:
>>>
>>>        @task(cache=True)
>>>        def train_model(data_path: str): ...
>>>
>>>        @task(cache=CacheConfig(ttl=3600, exclude_inputs=["account_id"]))
>>>        def fetch_data(account_id: str, date_range: str): ...
>>>
>>>        # Classic operators
>>>        PythonOperator(..., cache=True)
>>>
>>> I propose a new "cache" parameter to BaseOperator:
>>>        cache: CacheConfig | bool | None = None
>>>
>>>        cache=True is sugar for CacheConfig() with all defaults.  Tasks
>>> without cache set are completely unaffected; the scheduler skips the
>> check
>>> entirely.
>>>
>>>
>>> HOW IT WORKS
>>>
>>>    1. Scheduler is about to queue a cache-enabled task.
>>>    2. Computes a cache key from the task's identity and inputs.
>>>    3. Checks the cache backend (with a fail-open timeout).
>>>    4. Cache hit: mark SUCCESS, push cached result to XCom, done.
>>>    5. Cache miss or timeout: execute normally, store result afterward.
>>>
>>> The cache check happens in the scheduler before spawning the subprocess.
>>> This is the whole point; we avoid the 100ms+ (estimated, I don't have
>> hard
>>> numbers on this) subprocess overhead entirely on cache hits.  If the
>> cache
>>> backend is slow or unreachable, we timeout and execute normally.
>>>
>>>
>>> CACHE KEY STRUCTURE
>>>
>>> The cache key is a hash of:
>>>
>>>    - Always included (system-managed): team_name, dag_id, task_id, task
>>> source hash
>>>    - Included by default: all kwargs passed to the task
>>>    - User-subtractable via exclude_inputs: kwargs irrelevant to the result
>>>    - User-addable via include_inputs: context fields like dag_run.conf
>> keys
>>> The task source hash is computed on the fly from the serialized Dag JSON
>>> (already loaded in the scheduler) and calculating it shouldn't take more
>>> than a couple microseconds.  If the task's code changes, the hash
>> changes,
>>> and the cache is invalid.
>>>
>>>
>>> CACHE INVALIDATION
>>>
>>>    - include_inputs / exclude_inputs determine identity ("Are these the
>>> same inputs?")
>>>    - TTL determines validity ("Are these results still fresh?")
>>>
>>> A cache hit requires both.  TTL is configurable with a global default and
>>> per-task override.  Manual invalidation may also be available or may be a
>>> future addition, I'm not sure on exactly how that would work.
>>>
>>>
>>> STORAGE BACKEND
>>>
>>> Pluggable interface with a metadata DB default.  Same pattern as XCom
>>> backends, secrets backends, etc.  Providers can implement alternatives
>> (S3,
>>> Redis, dedicated DB).
>>>
>>>
>>> MULTI-TEAM ISOLATION
>>>
>>> team_name is pulled from the Dag's bundle-to-team association, not
>>> user-provided.  It will be baked into the cache key hash and stored as a
>> DB
>>> column for query filtering.  When multi_team is enabled, no cross-team
>>> cache hits are possible.
>>>
>>>
>>> TASK TYPE SUPPORT
>>>
>>>    - Operators and Transfers: cacheable, opt-in
>>>    - Sensors: BaseSensorOperator defaults to cache=False since the point
>> of
>>> a sensor is to check the world right now, not return a stored answer.
>>> Users could explicitly re-enable it on a per-sensor basis if they want to
>>> for some reason since BaseSensorOperator inherits BaseOperator, but
>> that's
>>> a "free" feature and not something extra I intend to put time into
>>> supporting or explicitly blocking.
>>>    - Waiters and Notifiers: not applicable (don't inherit BaseOperator)
>>>
>>> ADMIN CONTROLS
>>>
>>>    - Global kill switch: [core] task_caching_enabled
>>>    - Global TTL default in airflow.cfg
>>>    - Cache backend class path: [core] task_cache_backend
>>>    - Fail-open timeout (default 500ms?)
>>>
>>> PRIOR ART
>>>
>>> I could not find any prior discussion of this in the Airflow community
>> but
>>> I checked Prefect and Flyte; both of them have task caching.  This design
>>> is closest to Flyte's (opt-in, source-code-aware, project/domain
>> isolation)
>>> with stronger multi-team isolation and TTL support.
>>>
>>>
>>> LAUNCH PLAN
>>>
>>> This would launch as an experimental feature behind a feature flag.
>>>
>>>
>>> OPEN QUESTIONS
>>>
>>>    - Does this need an AIP?
>>>    - Any concerns with adding a "cache" param to BaseOperator?
>>>    - Thoughts on the cache key structure (include/exclude approach)?
>>>    - Anything I'm missing?
>>>
>>>    Looking forward to your thoughts.
>>>
>>>    - ferruzzi
>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to