amoghrajesh commented on code in PR #67118:
URL: https://github.com/apache/airflow/pull/67118#discussion_r3308510775


##########
task-sdk/src/airflow/sdk/bases/resumablemixin.py:
##########
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+    from pydantic import JsonValue
+
+    from airflow.sdk.definitions.context import Context
+    from airflow.sdk.types import Logger
+
+
+class ResumableJobMixin:
+    """
+    Mixin for operators that submit one long-running job to an external system 
and poll for completion.
+
+    **Purpose:** This mixin makes the synchronous operator path crash-safe. It 
is not a replacement
+    for deferrable operators — deferrable remains the recommended approach for 
long-running tasks when
+    a Triggerer is available and the async model fits the team. This mixin is 
for teams already running
+    synchronous operators who want worker crashes to reconnect to the existing 
job rather than
+    resubmitting a duplicate.
+
+    **How it works:** On the first run, after submitting the job, the external 
ID (driver ID, YARN
+    application ID, etc.) is persisted to ``task_state`` before polling 
starts. On retry, the mixin
+    reads that ID back and reconnects to the already-running job instead of 
starting a new one.
+
+    **What it does not do:** It does not free the worker slot during polling 
(use deferrable for that),
+    and it does not stream logs from the remote system (the operator controls 
that separately).
+
+    Usage: call ``execute_resumable(context)`` from the operator's 
``execute()`` when reconnection
+    is supported.
+
+    Subclasses must implement the methods specific to their external system. 
The mixin owns
+    only ``execute_resumable()`` and the task_state read/write logic.
+
+    Example::
+
+        class MyOperator(ResumableJobMixin, BaseOperator):
+            external_id_key = "my_job_id"
+
+            def execute(self, context):
+                return self.execute_resumable(context)
+
+            def submit_job(self, context) -> JsonValue:
+                return self.hook.submit(...)
+
+            def get_job_status(self, external_id: JsonValue) -> str:
+                return self.hook.get_status(external_id)
+
+            def is_job_active(self, status: str) -> bool:
+                return status in ("RUNNING", "PENDING")
+
+            def is_job_succeeded(self, status: str) -> bool:
+                return status == "SUCCEEDED"
+
+            def poll_until_complete(self, external_id: JsonValue, context: 
Context) -> None:
+                self.hook.poll(external_id)
+
+            def get_job_result(self, external_id: JsonValue, context: Context) 
-> Any:
+                return None
+    """
+
+    if TYPE_CHECKING:
+        # log comes from BaseOperator (via LoggingMixin) at runtime, but mypy 
cannot see
+        # that because ResumableJobMixin does not inherit from it directly.
+        log: Logger
+
+    # Key used to store and retrieve the external job ID from task_state 
across retries.
+    # Renaming this on a deployed operator breaks in-flight retries — the old 
key is already stored.
+    external_id_key: str = "remote_job_id"
+
+    def execute_resumable(self, context: Context) -> Any:
+        """
+        Core of the resumable execution logic. Call this from execute() when 
reconnection is supported.
+
+        On initial run: submits the job, persists the external ID to 
task_state, then polls.
+
+        Behaviour on retry:
+        - On retry with active job: skips submission, reconnects to the 
running job.
+        - On retry with succeeded job: skips submission and polling, returns 
result immediately.
+        - On retry with failed job: falls through and resubmits fresh.
+        """
+        task_state = context.get("task_state")
+
+        if task_state is not None:
+            external_id = task_state.get(self.external_id_key)
+            if external_id:
+                status = self.get_job_status(external_id)
+                if self.is_job_active(status):
+                    self.log.info(
+                        "Reconnecting to existing job identified by: %s 
(status: %s)", external_id, status
+                    )
+                    return self.poll_until_complete(external_id, context)
+                if self.is_job_succeeded(status):
+                    self.log.info(
+                        "Job with identifier: %s already completed 
successfully, skipping resubmission",
+                        external_id,
+                    )
+                    return self.get_job_result(external_id, context)
+                self.log.info(
+                    "Prior job with identifier: %s in terminal state %s, 
resubmitting fresh",
+                    external_id,
+                    status,
+                )
+
+        external_id = self.submit_job(context)
+
+        if task_state is not None:
+            task_state.set(self.external_id_key, external_id)

Review Comment:
   Yeah fair, documenting as known limitation. The window can be closed only if 
`task_state.set` is atomic with `submit_job`, which it can't be.
   
   The sentinel approach has the same window one step earlier: if the worker 
dies after writing "pending" but before submit_job returns, the next retry sees 
"pending", doesn't know if the job actually launched or not, and has no ID to 
check against — and we end up trading a known gap (stale ID) for an unknown gap 
(did the job even start?). We still cannot guarantee atomicity between "write 
to task_state" and "call external system".
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to