amoghrajesh commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3315855587
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -712,6 +802,125 @@ def _process_spark_submit_log(self, itr: Iterator[Any])
-> None:
self.log.info(line)
+ def _track_yarn_application(self, application_id: str) -> None:
+ """Poll the YARN RM REST API until ``app.finalStatus`` reaches a
terminal value."""
+ self.log.info(
+ "Tracking YARN application %s via ResourceManager REST API
polling",
+ application_id,
+ )
+ poll_interval = max(self._status_poll_interval, 1)
+ # Tolerate transient RM REST API failures (RM hiccup, network blip,
request
+ # timeout) the same way `_start_driver_status_tracking` does for spark
+ # standalone — only give up after this many consecutive failures.
+ consecutive_failures = 0
+ max_consecutive_failures = 10
+ while True:
+ self.log.debug("Polling YARN RM REST API for application %s",
application_id)
+ try:
+ final_status =
self._query_yarn_application_final_status(application_id)
+ except RuntimeError as exc:
+ consecutive_failures += 1
+ if consecutive_failures > max_consecutive_failures:
+ raise RuntimeError(
+ f"Giving up tracking YARN application {application_id}
after "
+ f"{max_consecutive_failures} consecutive YARN RM REST
API "
+ f"failures. Last error: {exc}"
+ ) from exc
+ self.log.warning(
+ "Transient YARN RM REST API failure (%d/%d): %s",
+ consecutive_failures,
+ max_consecutive_failures,
+ exc,
+ )
+ time.sleep(poll_interval)
+ continue
+ consecutive_failures = 0
+ if final_status == self._YARN_FINAL_SUCCESS:
+ self.log.info("YARN application %s finished with SUCCEEDED",
application_id)
+ return
+ if final_status in self._YARN_FINAL_FAILURES:
+ raise RuntimeError(
+ f"YARN application {application_id} ended with final
status: {final_status}"
+ )
+ if final_status != self._YARN_FINAL_UNDEFINED:
+ raise RuntimeError(
+ f"YARN application {application_id} returned unexpected
final status: {final_status}"
+ )
+ time.sleep(poll_interval)
Review Comment:
The loop only reads `finalStatus` and treats `UNDEFINED` as "still running."
When the AM dies before registering with the RM, YARN sets `state=FAILED` but
leaves `finalStatus=UNDEFINED`, there was never a final status to write.
This loop will never exit for that case. Should also read `app["state"]` and
raise immediately if state in `("FAILED", "KILLED")` regardless of finalStatus.
The `_rm_status_resp` test helper already includes a state field but the code
never reads it.
##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -157,7 +175,7 @@ def __init__(
proxy_user: str | None = None,
name: str = "arrow-spark",
num_executors: int | None = None,
- status_poll_interval: int = 1,
+ status_poll_interval: int = 10,
Review Comment:
Changing the default from 1 to 10 affects all existing YARN users, not just
`yarn_track_via_rm_api=True` ones. Someone with short-running Spark jobs on
YARN standalone will now wait up to 10s to detect completion instead of 1s with
no opt-out. The RM REST path can enforce its own floor (you already have
max(self._status_poll_interval, 1)) — the default for the pre-existing codepath
should stay at 1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]