amoghrajesh opened a new pull request, #68213:
URL: https://github.com/apache/airflow/pull/68213
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [x] Yes: claude sonnet 4.6
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
### What
`ResumableJobMixin` makes synchronous operators crash safe by persisting an
external job ID to `task_store` before polling, then reconnecting on retry.
However, adding a way to observe what decision the mixin made at runtime,
whether it submitted fresh, reconnected, or skipped a duplicate without reading
task logs manually is a huge win for debuggability.
### Current behaviour
No metrics or traces are emitted by `ResumableJobMixin`. The reconnect vs.
resubmit decision is invisible to monitoring systems.
### Proposed change
Adding three statsd counters and an otel span to `execute_resumable()`:
**Counters** (tagged `operator=<ClassName>`):
- `resumable_job.fresh_submit`: a new job was submitted (first run or after
terminal failure)
- `resumable_job.reconnect_attempt`: a stored ID was found and its status
checked
- `resumable_job.reconnect_success`: the stored job was still active and
reconnected to
**Trace span** (`resumable_job.resume_decision`):
- Wraps only the state load and decision block, not the full poll duration
- Attributes: `operator`, `resumable.external_id_key`, `resumable.decision`,
`resumable.external_id`, `resumable.prior_status`
- Automatically becomes a child of the task runners existing span
### Changes of Note
The span deliberately excludes `submit_job()` and
`poll_until_complete()`since those are the operator (who used
ResumableJobMixin) domain. The span covers only the decision the mixin itself
owns, which is: read stored ID -> check status -> decide what's next.
### Testing
My setup involved running breeze with statsd integration and manually
running a jaeger too. `breeze start-airflow --integration statsd` (which
includes statsd-exporter, prometheus, grafana, and jaeger in my local).
Tested with this DAG:
```python
class MockBatchOperator(BaseOperator, ResumableJobMixin):
"""
Submits a mock batch job and polls until completion.
Demonstrates how ``ResumableJobMixin`` handles a worker crash between
submission and the start of polling: on retry the job ID is read from
``task_store`` and the operator reconnects rather than resubmitting.
"""
external_id_key = "job_id"
def execute(self, context: Context) -> Any:
return self.execute_resumable(context)
def submit_job(self, context: Context) -> JsonValue:
job_id = _generate_job_id()
_submit(job_id)
self.log.info("Submitted batch job", job_id=job_id)
return job_id
def get_job_status(self, external_id: JsonValue, context: Context) ->
str:
# Default to RUNNING if the ID is not in the local cache — this
simulates
# a real external system that keeps running after a worker crash.
return _JOBS.get(str(external_id), "RUNNING")
def is_job_active(self, status: str) -> bool:
return status == "RUNNING"
def is_job_succeeded(self, status: str) -> bool:
return status == "SUCCEEDED"
def poll_until_complete(self, external_id: JsonValue, context: Context)
-> None:
self.log.info("Polling job — kill the worker now to trigger a
retry", job_id=external_id)
time.sleep(60)
self.log.info("Polling job until complete", job_id=external_id)
_poll(str(external_id))
def get_job_result(self, external_id: JsonValue, context: Context) ->
str:
return f"Job {external_id} finished with status
{_get_status(str(external_id))}"
with DAG(
dag_id="example_resumable_job",
schedule=None,
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["example", "resumable", "task-store"],
doc_md=__doc__,
):
MockBatchOperator(
task_id="run_batch_job",
retries=2,
retry_delay=timedelta(seconds=5),
)
```
- Ran the dag
- Staged a worker kill in few seconds into the run
- Respawned the worker back up and let the job reconnect and succeed
#### Statsd exporter
<img width="930" height="109" alt="image"
src="https://github.com/user-attachments/assets/d058c9ba-68a6-4e9c-9e15-207a5d19c952"
/>
#### Prometheus values
<img width="1726" height="653" alt="image"
src="https://github.com/user-attachments/assets/f7be03d1-ae8c-462a-844b-9987a8c5b698"
/>
#### Jaeger span traces
<img width="1726" height="653" alt="image"
src="https://github.com/user-attachments/assets/4b2ad037-9134-4c40-9c60-7b415f5cc4d1"
/>
Decisions:
<img width="1726" height="901" alt="image"
src="https://github.com/user-attachments/assets/17d9fdba-b266-4f9a-8d41-deb6b3391dff"
/>
Resumable job decisions:
<img width="1726" height="901" alt="image"
src="https://github.com/user-attachments/assets/12ac7819-701e-4764-bddc-48095bedbc15"
/>
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]