amoghrajesh commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3311352887


##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -29,15 +29,25 @@
 import uuid
 from collections.abc import Iterator
 from pathlib import Path
-from typing import Any
+from typing import TYPE_CHECKING, Any
 
-from airflow.providers.common.compat.sdk import AirflowException, BaseHook, 
conf as airflow_conf
+import requests

Review Comment:
   requests still isn't a declared dependency in 
`providers/apache/spark/pyproject.toml`, I am doing it in 
https://github.com/apache/airflow/pull/67118 anyways, so if this lands after 
that one, we are good.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +792,133 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll the YARN RM REST API until ``app.finalStatus`` reaches a 
terminal value."""
+        self.log.info(
+            "Tracking YARN application %s via ResourceManager REST API 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)

Review Comment:
   `_status_poll_interval` defaults to 1s but for RM REST calls over the 
network on a job running hours, 1s means tens of thousands of requests. Worth 
either defaulting higher for the RM path (10-30s) or documenting strongly that 
users should set `status_poll_interval` higher when using this feature.
   
   



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -256,6 +308,10 @@ def _resolve_should_track_driver_status(self) -> bool:
         """
         return "spark://" in self._connection["master"] and 
self._connection["deploy_mode"] == "cluster"
 
+    def _should_track_yarn_application_via_rm_api(self) -> bool:
+        """Return whether this submit should switch to YARN RM REST API 
polling."""
+        return self._yarn_track_via_rm_api and self._is_yarn and 
self._connection["deploy_mode"] == "cluster"

Review Comment:
   If `yarn_track_via_rm_api=True` but `deploy_mode=client`, this silently 
returns False and falls back to blocking spark-submit with no signal to the 
user. 
   
   Should add a log here or raise at submit() time so users are not confused 
when the flag appears to have no effect.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +792,133 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll the YARN RM REST API until ``app.finalStatus`` reaches a 
terminal value."""
+        self.log.info(
+            "Tracking YARN application %s via ResourceManager REST API 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient RM REST API failures (RM hiccup, network blip, 
request
+        # timeout) the same way `_start_driver_status_tracking` does for spark
+        # standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling YARN RM REST API for application %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive YARN RM REST 
API "
+                        f"failures. Last error: {exc}"
+                    ) from exc
+                self.log.warning(
+                    "Transient YARN RM REST API failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            if final_status != self._YARN_FINAL_UNDEFINED:
+                raise RuntimeError(
+                    f"YARN application {application_id} returned unexpected 
final status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _get_yarn_rm_base_url(self) -> str:
+        """
+        Resolve the YARN ResourceManager webapp base URL from the Spark 
connection.
+
+        Reads the ``yarn_resourcemanager_webapp_address`` key from the Spark
+        connection's ``extra`` JSON. Bare ``host:port`` values get ``http://``
+        prepended; fully-qualified URLs are used as-is. Trailing slashes 
stripped.
+        The resolved URL is cached on the hook instance so the polling loop 
does
+        not re-fetch the connection (or re-hit any Secrets Backend) on every 
iteration.
+        """
+        if self._yarn_rm_base_url is not None:
+            return self._yarn_rm_base_url
+        try:
+            conn = self.get_connection(self._conn_id)
+        except AirflowNotFoundException:
+            conn = None
+        raw = ""
+        if conn is not None:
+            raw = 
(conn.extra_dejson.get(self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY) or "").strip()
+        if not raw:
+            raise ValueError(
+                f"`yarn_track_via_rm_api=True` requires the Spark connection's 
`extra` to set "
+                f"`{self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY}` (e.g. 
`http://rm.example.com:8088`)."
+            )
+        url = raw if "://" in raw else f"http://{raw}";
+        self._yarn_rm_base_url = url.rstrip("/")
+        return self._yarn_rm_base_url
+
+    def _query_yarn_application_final_status(self, application_id: str) -> str:
+        """GET ``/ws/v1/cluster/apps/{id}`` once and return 
``app.finalStatus``."""
+        url = 
f"{self._get_yarn_rm_base_url()}/ws/v1/cluster/apps/{application_id}"
+        try:
+            resp = requests.get(url, auth=self._yarn_rm_auth, 
timeout=self._HTTP_TIMEOUT)
+        except requests.exceptions.RequestException as exc:
+            raise RuntimeError(
+                f"YARN RM REST API request for application {application_id} 
failed: {exc}"
+            ) from exc
+        if resp.status_code != 200:
+            raise RuntimeError(
+                f"YARN RM REST API returned HTTP {resp.status_code} for 
application "
+                f"{application_id}: {resp.text[:200]}"
+            )
+        try:
+            return resp.json()["app"]["finalStatus"]
+        except (ValueError, KeyError, TypeError) as exc:
+            raise RuntimeError(
+                f"YARN RM REST API returned unexpected payload for application 
"
+                f"{application_id}: {resp.text[:200]}"
+            ) from exc
+
+    def _kill_yarn_application(self, application_id: str) -> None:
+        """PUT ``/ws/v1/cluster/apps/{id}/state`` to kill the application 
(best-effort)."""
+        try:
+            url = 
f"{self._get_yarn_rm_base_url()}/ws/v1/cluster/apps/{application_id}/state"
+        except ValueError as exc:
+            self.log.warning(
+                "Cannot send YARN kill for %s: %s",
+                application_id,
+                exc,
+            )
+            return
+        try:
+            resp = requests.put(
+                url,
+                json={"state": "KILLED"},
+                auth=self._yarn_rm_auth,
+                timeout=self._HTTP_TIMEOUT,
+            )
+        except requests.exceptions.RequestException as exc:
+            self.log.warning("YARN kill request for %s failed: %s", 
application_id, exc)
+            return
+        self.log.info("YARN kill request for %s returned HTTP %s", 
application_id, resp.status_code)
+
+    @staticmethod
+    def _is_yarn_application_submitted(line: str, application_id: str) -> bool:
+        """Return whether a YARN log line means ResourceManager received the 
application."""
+        return (
+            f"Submitted application {application_id}" in line
+            or f"Application report for {application_id}" in line
+        )

Review Comment:
   Hard dependency on spark job's "Submitted application" log line appearing in 
stderr. Log level filtering, custom log formatters, or a future Spark version 
changing this message would cause the signal to never fire — raising a 
confusing error even though the app launched. 
   
   Since the app ID is already extracted from logs, is the second "submitted" 
signal actually needed, or can this check be dropped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to