bujjibabukatta opened a new pull request, #68277:
URL: https://github.com/apache/airflow/pull/68277

   Closes #67168
   
   ## Summary
   
   Adds `deferrable: bool = False` parameter to `SparkSubmitOperator` as
   requested in #67168 (follow-up to #67118).
   
   ## Two execution modes
   
   | Mode | Behaviour |
   |---|---|
   | `deferrable=False` (default) | Existing sync path via `ResumableJobMixin` 
— worker slot held during polling, reconnects to existing driver on 
infrastructure failure |
   | `deferrable=True` | Submits job then calls `self.defer()` → 
`SparkDriverTrigger` — worker slot freed during polling, crash recovery handled 
by trigger-row persistence |
   
   ## Changes
   
   ### `operators/spark_submit.py`
   - Add `deferrable: bool` param (respects `operators.default_deferrable` 
config)
   - Add `if self.deferrable:` branch in `execute()` — submits job then defers 
to `SparkDriverTrigger`
   - Add `execute_complete()` — callback fired when trigger completes, raises 
`AirflowException` on failure
   - Add `_build_master_rest_urls()` — builds REST API URLs from connection 
extras, supports HA (comma-separated masters)
   
   ### `triggers/spark_submit.py` (new)
   - `SparkDriverTrigger` — async trigger that polls Spark standalone REST API 
via `aiohttp`
   - Supports HA: tries each master URL in order on every poll
   - Treats `RELAUNCHING`/`UNKNOWN` as still-alive (master in recovery)
   - `serialize()` implemented so Airflow can re-create the trigger after a 
crash
   
   ### `provider.yaml`
   - Register `triggers/spark_submit` module so Airflow triggerer process 
discovers `SparkDriverTrigger`
   
   ### `pyproject.toml`
   - Add `aiohttp>=3.9.0` dependency (required by `SparkDriverTrigger`)
   
   ### Tests
   - `tests/triggers/test_spark_submit.py` (new) — 11 tests covering 
`SparkDriverTrigger` serialize, run loop, HA failover, REST polling
   - `tests/operators/test_spark_submit.py` — new 
`TestSparkSubmitOperatorDeferrable` class with 9 tests
   
   ## Relationship to #67118
   
   Both modes share `spark_job_id` in `task_state`. A user can switch from 
`deferrable=False` to `deferrable=True` without any state migration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to