amoghrajesh opened a new pull request, #68213:
URL: https://github.com/apache/airflow/pull/68213

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes: claude sonnet 4.6
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ### What
   
   `ResumableJobMixin` makes synchronous operators crash safe by persisting an 
external job ID to `task_store` before polling, then reconnecting on retry. 
However, adding a way to observe what decision the mixin made at runtime, 
whether it submitted fresh, reconnected, or skipped a duplicate without reading 
task logs manually is a huge win for debuggability.
   
   ### Current behaviour
   
   No metrics or traces are emitted by `ResumableJobMixin`. The reconnect vs. 
resubmit decision is invisible to monitoring systems.
   
   ### Proposed change
   
   Adding three statsd counters and an otel span to `execute_resumable()`:
   
   **Counters** (tagged `operator=<ClassName>`):
   - `resumable_job.fresh_submit`: a new job was submitted (first run or after 
terminal failure)
   - `resumable_job.reconnect_attempt`: a stored ID was found and its status 
checked
   - `resumable_job.reconnect_success`: the stored job was still active and 
reconnected to
   
   **Trace span** (`resumable_job.resume_decision`):
   - Wraps only the state load and decision block, not the full poll duration
   - Attributes: `operator`, `resumable.external_id_key`, `resumable.decision`, 
`resumable.external_id`, `resumable.prior_status`
   - Automatically becomes a child of the task runners existing span
   
   ### Changes of Note
   
   The span deliberately excludes `submit_job()` and 
`poll_until_complete()`since those are the operator (who used 
ResumableJobMixin) domain. The span covers only the decision the mixin itself 
owns, which is: read stored ID -> check status -> decide what's next.
   
   ### Testing
   
   My setup involved running breeze with statsd integration and manually 
running a jaeger too. `breeze start-airflow --integration statsd` (which 
includes statsd-exporter, prometheus, grafana, and jaeger in my local). 
   
   
   Tested with this DAG:
   
   ```python
   
   class MockBatchOperator(BaseOperator, ResumableJobMixin):
       """
       Submits a mock batch job and polls until completion.
   
       Demonstrates how ``ResumableJobMixin`` handles a worker crash between
       submission and the start of polling: on retry the job ID is read from
       ``task_store`` and the operator reconnects rather than resubmitting.
       """
   
       external_id_key = "job_id"
   
       def execute(self, context: Context) -> Any:
           return self.execute_resumable(context)
   
       def submit_job(self, context: Context) -> JsonValue:
           job_id = _generate_job_id()
           _submit(job_id)
           self.log.info("Submitted batch job", job_id=job_id)
           return job_id
   
       def get_job_status(self, external_id: JsonValue, context: Context) -> 
str:
           # Default to RUNNING if the ID is not in the local cache — this 
simulates
           # a real external system that keeps running after a worker crash.
           return _JOBS.get(str(external_id), "RUNNING")
   
       def is_job_active(self, status: str) -> bool:
           return status == "RUNNING"
   
       def is_job_succeeded(self, status: str) -> bool:
           return status == "SUCCEEDED"
   
       def poll_until_complete(self, external_id: JsonValue, context: Context) 
-> None:
           self.log.info("Polling job — kill the worker now to trigger a 
retry", job_id=external_id)
           time.sleep(60)
           self.log.info("Polling job until complete", job_id=external_id)
           _poll(str(external_id))
   
       def get_job_result(self, external_id: JsonValue, context: Context) -> 
str:
           return f"Job {external_id} finished with status 
{_get_status(str(external_id))}"
   
   
   with DAG(
       dag_id="example_resumable_job",
       schedule=None,
       start_date=datetime(2026, 1, 1),
       catchup=False,
       tags=["example", "resumable", "task-store"],
       doc_md=__doc__,
   ):
       MockBatchOperator(
           task_id="run_batch_job",
           retries=2,
           retry_delay=timedelta(seconds=5),
       )
   
   ```
   
   - Ran the dag
   - Staged a worker kill in few seconds into the run
   - Respawned the worker back up and let the job reconnect and succeed
   
   #### Statsd exporter
   <img width="930" height="109" alt="image" 
src="https://github.com/user-attachments/assets/d058c9ba-68a6-4e9c-9e15-207a5d19c952";
 />
   
   
   #### Prometheus values
   
   <img width="1726" height="653" alt="image" 
src="https://github.com/user-attachments/assets/f7be03d1-ae8c-462a-844b-9987a8c5b698";
 />
   
   
   #### Jaeger span traces
   
   <img width="1726" height="653" alt="image" 
src="https://github.com/user-attachments/assets/4b2ad037-9134-4c40-9c60-7b415f5cc4d1";
 />
   
   
   Decisions:
   
   <img width="1726" height="901" alt="image" 
src="https://github.com/user-attachments/assets/17d9fdba-b266-4f9a-8d41-deb6b3391dff";
 />
   
   
   Resumable job decisions:
   
   <img width="1726" height="901" alt="image" 
src="https://github.com/user-attachments/assets/12ac7819-701e-4764-bddc-48095bedbc15";
 />
   
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to