nailo2c commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3315295653


##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +792,133 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll the YARN RM REST API until ``app.finalStatus`` reaches a 
terminal value."""
+        self.log.info(
+            "Tracking YARN application %s via ResourceManager REST API 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient RM REST API failures (RM hiccup, network blip, 
request
+        # timeout) the same way `_start_driver_status_tracking` does for spark
+        # standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling YARN RM REST API for application %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive YARN RM REST 
API "
+                        f"failures. Last error: {exc}"
+                    ) from exc
+                self.log.warning(
+                    "Transient YARN RM REST API failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            if final_status != self._YARN_FINAL_UNDEFINED:
+                raise RuntimeError(
+                    f"YARN application {application_id} returned unexpected 
final status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _get_yarn_rm_base_url(self) -> str:
+        """
+        Resolve the YARN ResourceManager webapp base URL from the Spark 
connection.
+
+        Reads the ``yarn_resourcemanager_webapp_address`` key from the Spark
+        connection's ``extra`` JSON. Bare ``host:port`` values get ``http://``
+        prepended; fully-qualified URLs are used as-is. Trailing slashes 
stripped.
+        The resolved URL is cached on the hook instance so the polling loop 
does
+        not re-fetch the connection (or re-hit any Secrets Backend) on every 
iteration.
+        """
+        if self._yarn_rm_base_url is not None:
+            return self._yarn_rm_base_url
+        try:
+            conn = self.get_connection(self._conn_id)
+        except AirflowNotFoundException:
+            conn = None
+        raw = ""
+        if conn is not None:
+            raw = 
(conn.extra_dejson.get(self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY) or "").strip()
+        if not raw:
+            raise ValueError(
+                f"`yarn_track_via_rm_api=True` requires the Spark connection's 
`extra` to set "
+                f"`{self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY}` (e.g. 
`http://rm.example.com:8088`)."
+            )
+        url = raw if "://" in raw else f"http://{raw}";
+        self._yarn_rm_base_url = url.rstrip("/")
+        return self._yarn_rm_base_url
+
+    def _query_yarn_application_final_status(self, application_id: str) -> str:
+        """GET ``/ws/v1/cluster/apps/{id}`` once and return 
``app.finalStatus``."""
+        url = 
f"{self._get_yarn_rm_base_url()}/ws/v1/cluster/apps/{application_id}"
+        try:
+            resp = requests.get(url, auth=self._yarn_rm_auth, 
timeout=self._HTTP_TIMEOUT)
+        except requests.exceptions.RequestException as exc:
+            raise RuntimeError(
+                f"YARN RM REST API request for application {application_id} 
failed: {exc}"
+            ) from exc
+        if resp.status_code != 200:
+            raise RuntimeError(
+                f"YARN RM REST API returned HTTP {resp.status_code} for 
application "
+                f"{application_id}: {resp.text[:200]}"
+            )
+        try:
+            return resp.json()["app"]["finalStatus"]
+        except (ValueError, KeyError, TypeError) as exc:
+            raise RuntimeError(
+                f"YARN RM REST API returned unexpected payload for application 
"
+                f"{application_id}: {resp.text[:200]}"
+            ) from exc
+
+    def _kill_yarn_application(self, application_id: str) -> None:
+        """PUT ``/ws/v1/cluster/apps/{id}/state`` to kill the application 
(best-effort)."""
+        try:
+            url = 
f"{self._get_yarn_rm_base_url()}/ws/v1/cluster/apps/{application_id}/state"
+        except ValueError as exc:
+            self.log.warning(
+                "Cannot send YARN kill for %s: %s",
+                application_id,
+                exc,
+            )
+            return
+        try:
+            resp = requests.put(
+                url,
+                json={"state": "KILLED"},
+                auth=self._yarn_rm_auth,
+                timeout=self._HTTP_TIMEOUT,
+            )
+        except requests.exceptions.RequestException as exc:
+            self.log.warning("YARN kill request for %s failed: %s", 
application_id, exc)
+            return
+        self.log.info("YARN kill request for %s returned HTTP %s", 
application_id, resp.status_code)
+
+    @staticmethod
+    def _is_yarn_application_submitted(line: str, application_id: str) -> bool:
+        """Return whether a YARN log line means ResourceManager received the 
application."""
+        return (
+            f"Submitted application {application_id}" in line
+            or f"Application report for {application_id}" in line
+        )

Review Comment:
   Makes sense. I removed that extra log-line requirement.
   
   We now rely on the discovered YARN application id plus a successful 
`spark-submit` exit, then let RM REST API polling be the source of truth.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to