1fanwang opened a new pull request, #68955:
URL: https://github.com/apache/airflow/pull/68955

   > **Stacked PR.** Builds on #68936 (TriggerDagRunOperator `durable`) and 
#68952 (extract
   > `resume_or_submit`). The incremental change here is the **last commit** — 
the runner refactor;
   > the earlier commits are the two parent PRs and will drop out of the diff 
once they merge.
   
   ## Why
   
   #68936 added a crash-safe `durable` wait to `TriggerDagRunOperator`, but it 
**hand-rolled** the
   persist-and-reconnect logic in the task runner — a duplicate of what 
`ResumableJobMixin` already
   implements. #68952 lifted that logic into a reusable `resume_or_submit` 
core. This PR makes the
   runner **consume** that core, so the durability primitive has one 
implementation instead of two.
   
   ## What
   
   `_handle_trigger_dag_run`'s durable path now drives `resume_or_submit` 
through runner callbacks —
   `submit` (send `TriggerDagRun`, raising on `DagRunAlreadyExists`), 
`get_status` (`GetDagRunState`),
   `poll` (the wait loop, raising on a failed state so the retry policy still 
fires), `get_result`
   (the run-id XCom). The hand-rolled `_evaluate_prior_triggered_run` and the 
inline persist/decision
   are **deleted**. No behaviour change.
   
   This is the proof that the #68952 extraction generalises: one operator that 
fits `ResumableJobMixin`
   directly (Spark, Livy) and one whose wait lives in the runner 
(TriggerDagRun) now share the same
   durability core.
   
   ## Tests
   
   The full `TriggerDagRunOperator` runner suite passes unchanged — the 
`durable` reconnect tests now
   exercise the `resume_or_submit` path, and the non-durable / deferrable / 
conflict tests are
   untouched. That unchanged suite is the behaviour-preservation proof.
   
   ### End-to-end (live, Breeze, on this PR's code)
   
   Same crash scenario as #68936, re-run on the refactored code: a parent
   `TriggerDagRunOperator(durable=True, wait_for_completion=True)` triggers a 
child Dag; the parent's
   worker is `SIGKILL`ed mid-wait; the scheduler retries. Attempt 2 reconnects 
**through the shared
   core** — the log line is `resume_or_submit`'s own "Reconnecting to existing 
job", which proves the
   durable path now runs the framework primitive rather than the deleted 
hand-rolled logic.
   
   <details><summary>Raw</summary>
   
   ```text
   attempt=2.log:
     "Reconnecting to existing job"  external_id_key=triggered_dag_run_id
        external_id=manual__2026-...T20:18:21Z  status=running
   
   parent run: success   (trigger_child succeeded on try_number=2)
   child dag runs since the trigger:
     manual__2026-...T20:18:21Z  success   <- single run, reconnected
     => 1 run from one parent task, no duplicate
   ```
   
   </details>
   
   ## Risk
   
   Behaviour-preserving refactor. The durable path's persist + three-state 
reconnect is now the shared
   core (covered by #68952's tests); the runner only supplies the bindings.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [ ] Yes (please specify the tool below)
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to