This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 42aeb4b398c Documentation for ResumableJobMixin and resumable tasks 
(#68136)
42aeb4b398c is described below

commit 42aeb4b398c71c2162055fb9f02b7f5554be41d8
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 8 14:53:37 2026 +0530

    Documentation for ResumableJobMixin and resumable tasks (#68136)
---
 .../docs/authoring-and-scheduling/deferring.rst    |   3 +
 airflow-core/docs/core-concepts/index.rst          |   1 +
 .../docs/core-concepts/resumable-tasks.rst         | 187 +++++++++++++++++++++
 task-sdk/docs/deferred-vs-async-operators.rst      |   9 +
 task-sdk/docs/index.rst                            |   1 +
 task-sdk/docs/resumable-job-mixin.rst              | 167 ++++++++++++++++++
 6 files changed, 368 insertions(+)

diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst 
b/airflow-core/docs/authoring-and-scheduling/deferring.rst
index 95c7c2b0b47..8a074e4d66e 100644
--- a/airflow-core/docs/authoring-and-scheduling/deferring.rst
+++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst
@@ -34,6 +34,9 @@ This is where *Deferrable Operators* can be used. When it has 
nothing to do but
 
    For guidance on when to use deferred operators versus async tasks,
    see `Deferred vs Async Operators 
<https://airflow.apache.org/docs/task-sdk/stable/deferred-vs-async-operators.html>`__.
+   For guidance on when to use deferrable operators versus resumable tasks
+   (crash-safe synchronous operators that use the task state store), see
+   :ref:`Resumable Tasks <concepts-resumable-tasks>`.
 
 An overview of how this process works:
 
diff --git a/airflow-core/docs/core-concepts/index.rst 
b/airflow-core/docs/core-concepts/index.rst
index 5e53609d8d8..c3a9a8694f5 100644
--- a/airflow-core/docs/core-concepts/index.rst
+++ b/airflow-core/docs/core-concepts/index.rst
@@ -39,6 +39,7 @@ Here you can find detailed documentation about each one of 
the core concepts of
     operators
     sensors
     taskflow
+    resumable-tasks
     executor/index
     auth-manager/index
     multi-team
diff --git a/airflow-core/docs/core-concepts/resumable-tasks.rst 
b/airflow-core/docs/core-concepts/resumable-tasks.rst
new file mode 100644
index 00000000000..6371c0ba7c8
--- /dev/null
+++ b/airflow-core/docs/core-concepts/resumable-tasks.rst
@@ -0,0 +1,187 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts-resumable-tasks:
+
+Resumable Tasks
+===============
+
+.. versionadded:: 3.3.0
+
+Many data engineering workflows involve submitting work to an external system 
and waiting for it
+to complete. A Spark job, a BigQuery query, a Kubernetes batch pod, an EMR 
step: these are all
+tasks where the real work happens outside Airflow, and the operator's job is 
mostly to submit,
+poll, and collect the result.
+
+These tasks share a common failure mode. In classic operator cases, the worker 
slot is held for the
+entire polling duration, and if the worker process is restarted or the host is 
preempted, the task
+retries from scratch, losing all the progress made. Depending on the operator, 
that means the external
+job is submitted again, creating a duplicate run in context of the external 
system.
+
+Airflow recommends three approaches for handling long-running external work. 
Understanding the trade-offs
+between them helps you choose the right one for your situation.
+
+.. _concepts-resumable-tasks-deferrable:
+
+Deferrable Operators
+--------------------
+
+A deferrable operator pauses itself at the point where it would otherwise 
start polling, hands
+the polling work to the Triggerer component, and releases its worker slot. 
When the external
+condition is met, the Triggerer wakes the task and the worker resumes from 
where the operator
+left off.
+
+This is the most resource-efficient option. A single Triggerer process can 
concurrently watch
+thousands of conditions, so the rest of the worker pool stays free for other 
tasks.
+
+The trade-offs are:
+
+* A Triggerer component must be running. Deployments that do not include a 
Triggerer cannot use this pattern.
+* Writing a custom deferrable operator requires implementing a dedicated 
``Trigger`` class in
+  addition to the operator itself.
+* The polling logic runs inside the Triggerer's async event loop. Blocking 
calls inside a
+  Trigger stall the entire Triggerer process.
+
+If a deferrable operator already exists for your use case, or your team is 
comfortable
+implementing one, this is the recommended path considering its resource 
efficiency.
+
+For more details, see :doc:`../authoring-and-scheduling/deferring`.
+
+.. _concepts-resumable-tasks-resumable:
+
+Resumable Tasks
+---------------
+
+A resumable task uses the task state store to persist a checkpoint before
+it would otherwise lose progress. On retry, the task reads that checkpoint and 
continues from
+where it left off rather than starting over.
+
+The worker slot is held for the full duration of the task, the same as a 
standard synchronous
+operator. The benefit is crash safety and continuity, not resource efficiency.
+
+Resumable tasks are useful when:
+
+* No deferrable operator exists for your external system and writing one is 
not practical.
+* You want crash recovery without operating a Triggerer.
+* The task processes work incrementally (for example, reading files from a 
list or paginating
+  through API results) and should be able to resume from its last completed 
batch.
+
+**General pattern**
+
+The task reads a checkpoint from ``task_store`` at the start, does some work, 
writes an updated
+checkpoint, and either continues or finishes. On the next run (whether due to 
a retry after a
+crash or a deliberate reschedule), it reads the checkpoint again and picks up 
from there.
+
+.. code-block:: python
+
+    from airflow.sdk import dag, task
+
+
+    @dag(schedule=None)
+    def process_files_dag():
+
+        @task(retries=5)
+        def process_files(context=None):
+            task_store = context["task_store"]
+            files = ["a.csv", "b.csv", "c.csv", "d.csv"]
+
+            last_processed = task_store.get("last_processed")
+            start_index = 0
+            if last_processed is not None:
+                start_index = files.index(last_processed) + 1
+
+            for file in files[start_index:]:
+                # ... process the file ...
+                task_store.set("last_processed", file)
+
+        process_files()
+
+
+    process_files_dag()
+
+This pattern works without any additional work, relying only on ``context``. 
The state store is just
+a key-value store scoped to the task instance, and what you checkpoint is up 
to you.
+
+**Resumable operators for external jobs**
+
+When the task submits a job to an external system and then polls for 
completion, there is an
+additional problem: on retry, the task would submit a second job even though 
the first one may
+still be running. Instead of having to handle this manually, the 
:class:`~airflow.sdk.ResumableJobMixin`
+solves this by persisting the external job identifier before polling starts, 
and reconnecting to the
+existing job on retry instead of submitting a new one.
+
+For more details and a working example, see 
:class:`~airflow.sdk.ResumableJobMixin`.
+
+.. _concepts-resumable-tasks-async:
+
+Asynchronous Tasks
+------------------
+
+.. note::
+
+   Async task support applies to Python tasks only: ``@task`` decorated 
``async def`` functions
+   and class-based operators that subclass ``BaseAsyncOperator``. It is not 
available for
+   other operator types.
+
+Python tasks support ``async``/``await`` syntax. When the decorated callable 
is an async
+function, Airflow runs it inside an event loop, which lets you fan out many 
concurrent I/O
+operations (HTTP requests, database queries, file reads) within a single task 
execution without
+blocking the event loop while waiting for each one.
+
+The worker slot is held for the full duration of the task. Async tasks are not 
designed for
+long external waits or crash recovery; they are designed for high-throughput 
I/O work that
+completes within a single execution.
+
+For guidance on when to use async tasks versus deferrable operators, see
+:doc:`task-sdk:deferred-vs-async-operators`.
+
+.. _concepts-resumable-tasks-comparison:
+
+Comparison
+----------
+
+.. list-table::
+   :header-rows: 1
+
+   * - Characteristic
+     - Deferrable operator
+     - Resumable task
+     - Async task
+   * - Worker slot during external wait
+     - Freed
+     - Held
+     - Held
+   * - Requires Triggerer
+     - Yes
+     - No
+     - No
+   * - Handles crash recovery
+     - Yes (via Triggerer)
+     - Yes (via task store checkpoint)
+     - No
+   * - Prevents duplicate job submission
+     - Depends on operator implementation
+     - Yes (with ``ResumableJobMixin``)
+     - Not applicable
+   * - Suitable for concurrent I/O fan-out
+     - No
+     - No
+     - Yes
+   * - Available from
+     - Airflow 2.2
+     - Airflow 3.3
+     - Airflow 3.2
diff --git a/task-sdk/docs/deferred-vs-async-operators.rst 
b/task-sdk/docs/deferred-vs-async-operators.rst
index 4d77deea81d..466060ab3a4 100644
--- a/task-sdk/docs/deferred-vs-async-operators.rst
+++ b/task-sdk/docs/deferred-vs-async-operators.rst
@@ -25,6 +25,15 @@ Deferred vs Async Operators
 Airflow 3.2 introduces Python-native async support for tasks, allowing 
concurrent I/O within a single worker slot.
 This page explains how async operators differ from deferred operators and when 
to use each.
 
+.. versionchanged:: 3.3.0
+
+   Airflow 3.3 adds a third pattern for long-running tasks: resumable tasks, 
which use the
+   task state store to checkpoint progress across retries. For guidance on 
when to use
+   resumable tasks versus deferrable operators, see
+   :ref:`apache-airflow:concepts-resumable-tasks`. For details on the
+   :class:`~airflow.sdk.ResumableJobMixin` for external job crash recovery, see
+   :doc:`resumable-job-mixin`.
+
 Deferred Operators
 ------------------
 
diff --git a/task-sdk/docs/index.rst b/task-sdk/docs/index.rst
index 7e3467c14ff..18e215d4ce7 100644
--- a/task-sdk/docs/index.rst
+++ b/task-sdk/docs/index.rst
@@ -173,6 +173,7 @@ For the full public API reference, see the :doc:`api` page.
   examples
   dynamic-task-mapping
   deferred-vs-async-operators
+  resumable-job-mixin
   api
   concepts
   executable-bundle-spec
diff --git a/task-sdk/docs/resumable-job-mixin.rst 
b/task-sdk/docs/resumable-job-mixin.rst
new file mode 100644
index 00000000000..32da769bb6a
--- /dev/null
+++ b/task-sdk/docs/resumable-job-mixin.rst
@@ -0,0 +1,167 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _sdk-resumable-job-mixin:
+
+ResumableJobMixin
+=================
+
+.. versionadded:: 3.3.0
+
+:class:`~airflow.sdk.ResumableJobMixin` is a mixin for operators that submit 
long-running jobs
+to an external system and poll for its completion. It makes the operator 
crash-safe by persisting
+the external job identifier to task state store before polling begins. If the 
worker is restarted
+or the host is preempted, the next retry reconnects to the already running job 
instead of submitting
+a duplicate.
+
+This mixin is not a replacement for deferrable operators. Deferrable operators 
free the
+worker slot during polling and are the recommended approach when a Triggerer 
is available.
+Use this mixin when you want crash safety on an existing synchronous operator 
without
+migrating to the deferrable pattern, or when your deployment does not include 
a Triggerer.
+
+For guidance on choosing between deferrable, resumable, and async approaches, 
see
+:ref:`apache-airflow:concepts-resumable-tasks`.
+
+Interface
+---------
+
+Subclasses must implement these six methods:
+
+.. code-block:: python
+
+    def submit_job(self, context: Context) -> JsonValue: ...
+    def get_job_status(self, external_id: JsonValue, context: Context) -> str: 
...
+    def is_job_active(self, status: str) -> bool: ...
+    def is_job_succeeded(self, status: str) -> bool: ...
+    def poll_until_complete(self, external_id: JsonValue, context: Context) -> 
None: ...
+    def get_job_result(self, external_id: JsonValue, context: Context) -> Any: 
...
+
+.. _sdk-resumable-job-mixin-implementing:
+
+Implementing the mixin
+----------------------
+
+Add inheritance to :class:`~airflow.sdk.ResumableJobMixin` in your operator 
class, then call
+``execute_resumable(context)`` from your ``execute`` method. The mixin 
requires you to
+implement six methods that describe how to interact with your external system:
+
+``submit_job(context)``
+    Submit the job and return its external identifier. The returned value is 
stored in
+    ``task_store`` and passed back to the other methods on retry. Return 
``None`` only if
+    the external system does not provide a trackable identifier; in that case 
the mixin
+    cannot provide crash safety and will resubmit on every retry.
+
+``get_job_status(external_id, context)``
+    Query the external system for the current job status. Return a raw string 
from
+    the external system. This method is called on retry to determine whether 
the job
+    is still running, succeeded, or failed.
+
+``is_job_active(status)``
+    Return ``True`` if the job is still running and can be reconnected to. 
``status`` is the
+    raw string returned by ``get_job_status``, a backend-specific value from 
the external
+    system, not an Airflow state::
+
+        def is_job_active(self, status: str) -> bool:
+            return status in ("RUNNING", "PENDING", "ACCEPTED")
+
+``is_job_succeeded(status)``
+    Return ``True`` if the job completed successfully. ``status`` is the same 
raw string
+    from the external system::
+
+        def is_job_succeeded(self, status: str) -> bool:
+            return status == "SUCCEEDED"
+
+``poll_until_complete(external_id, context)``
+    Block until the job reaches a terminal state. Raise on failure.
+
+``get_job_result(external_id, context)``
+    Return the job result after successful completion. Return ``None`` if not 
applicable.
+
+How it works
+------------
+
+On the first run, after ``submit_job`` returns the external identifier, the 
mixin persists
+that identifier to ``task_store`` before calling ``poll_until_complete``. If 
the worker
+crashes during polling, the next retry reads the stored identifier and calls 
``get_job_status``
+to check the current state of the job:
+
+* If the job is still running, the mixin calls ``poll_until_complete`` to 
reconnect and continue
+  waiting.
+* If the job already completed successfully, the mixin calls 
``get_job_result`` and returns
+  immediately without resubmitting.
+* If the job is in a terminal failure state, the mixin falls through and 
submits a fresh job.
+
+.. note::
+
+   There is a small window between ``submit_job`` returning and 
``task_store.set`` completing.
+   If the worker crashes in that gap, the next retry does not have the 
identifier and will
+   submit a fresh job. For most workloads this window is negligible.
+
+Example
+-------
+
+.. code-block:: python
+
+    from airflow.sdk import BaseOperator, ResumableJobMixin
+    from pydantic import JsonValue
+
+
+    class MyBatchOperator(BaseOperator, ResumableJobMixin):
+
+        external_id_key = "batch_job_id"
+
+        def execute(self, context):
+            return self.execute_resumable(context)
+
+        def submit_job(self, context) -> JsonValue:
+            return self.hook.submit_batch(...)
+
+        def get_job_status(self, external_id: JsonValue, context) -> str:
+            return self.hook.get_status(external_id)
+
+        def is_job_active(self, status: str) -> bool:
+            return status in ("RUNNING", "PENDING", "QUEUED")
+
+        def is_job_succeeded(self, status: str) -> bool:
+            return status == "SUCCEEDED"
+
+        def poll_until_complete(self, external_id: JsonValue, context) -> None:
+            self.hook.wait(external_id)
+
+        def get_job_result(self, external_id: JsonValue, context):
+            return None
+
+.. _sdk-resumable-job-mixin-external-id-key:
+
+External ID key
+---------------
+
+The ``external_id_key`` class attribute controls which key is used to store 
the job identifier
+in ``task_store``. The default value is ``"remote_job_id"``. You can override 
it on your
+subclass to use a more descriptive name:
+
+.. code-block:: python
+
+    class MyBatchOperator(ResumableJobMixin, BaseOperator):
+        external_id_key = "batch_job_id"
+
+.. warning::
+
+   Do not rename ``external_id_key`` on an operator that is already deployed 
and has
+   in-flight task instances. The old key is already stored in the task state 
store under the
+   previous name. A rename causes the mixin to treat every active retry as a 
fresh submission,
+   defeating the crash-safety guarantee.

Reply via email to