Copilot commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3213029803


##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +753,96 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> 
None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive `yarn 
application -status` "
+                        f"failures. Last error: {exc}"
+                    )

Review Comment:
   The retry budget logic is off by one: you tolerate 10 consecutive failures 
but raise on the 11th (`>`), while the raised message says "after 10 
consecutive ... failures". Either raise when `>= max_consecutive_failures` or 
update the message to reflect the actual threshold; also consider using 
exception chaining (`raise ... from exc`) to preserve the original error 
context.
   



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -99,13 +99,26 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         job finishes (on both success and on_kill). Useful for cleaning up 
sidecars such
         as Istio (e.g. ``["curl -X POST localhost:15020/quitquitquit"]``). 
Each command
         is executed via the shell; failures produce a warning but do not fail 
the task.
+    :param yarn_track_via_application_status: If True (and deploy_mode is YARN 
cluster),
+        release the ``spark-submit`` JVM once the application has been 
submitted to
+        YARN, then poll ``yarn application -status <appId>`` every
+        ``status_poll_interval`` seconds until the application reaches a final 
state.
+        This frees the worker from holding the long-lived submit JVM. Requires 
the
+        ``yarn`` CLI on the worker. Cluster-side driver logs should be used 
after the
+        switch to polling. Defaults to ``False``.

Review Comment:
   Docstring wording is confusing: `deploy_mode` is not "YARN cluster"; YARN is 
the master and `deploy_mode` is `cluster`. Consider rephrasing to something 
like "when master is YARN and deploy_mode is cluster" to match the actual 
configuration fields.
   



##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py:
##########
@@ -1006,6 +1006,30 @@ def test_yarn_process_on_kill(self, mock_popen, 
mock_renew_from_kt):
             in mock_popen.mock_calls
         )
 
+    
@patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen")
+    def 
test_yarn_process_on_kill_kills_application_after_submit_process_exits(self, 
mock_popen):
+        submit_process = MagicMock()
+        submit_process.poll.return_value = 0
+

Review Comment:
   New MagicMock instances are created without a `spec`/`autospec`. Using a 
spec (e.g., `spec=subprocess.Popen`) helps catch typos and mismatched APIs in 
tests.



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -86,6 +86,13 @@ class SparkSubmitOperator(BaseOperator):
                            on keytab for Kerberos login
     :param post_submit_commands: Optional list of shell commands to run after 
the Spark job finishes.
         Useful for cleaning up sidecars such as Istio. Failures produce a 
warning but do not fail the task.
+    :param yarn_track_via_application_status: If True (and deploy_mode is YARN 
cluster),
+        release the ``spark-submit`` JVM once the application has been 
submitted to
+        YARN, then poll ``yarn application -status <appId>`` every
+        ``status_poll_interval`` seconds until the application reaches a final 
state.
+        This frees the worker from holding the long-lived submit JVM. Requires 
the

Review Comment:
   Docstring wording is confusing: `deploy_mode` is not "YARN cluster"; YARN is 
the master and `deploy_mode` is `cluster`. Consider rephrasing to something 
like "when master is YARN and deploy_mode is cluster" to match the actual 
configuration fields.



##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py:
##########
@@ -1336,3 +1360,305 @@ def 
test_post_submit_commands_none_gives_empty_list(self):
         """Test that None post_submit_commands results in an empty list."""
         hook = SparkSubmitHook(conn_id="")
         assert hook._post_submit_commands == []
+
+    # ---------------------------------------------------------------
+    # yarn_track_via_application_status (issue #24171)
+    # ---------------------------------------------------------------
+
+    _YARN_LOG_LINES = [
+        "INFO Client: Requesting a new application from cluster with 1 
NodeManagers",
+        "INFO Client: Uploading resource file:/tmp/lib.zip -> "
+        
"hdfs://namenode:8020/user/root/.sparkStaging/application_1700000000000_0001/lib.zip",
+        "INFO Client: Submitting application application_1700000000000_0001 to 
ResourceManager",
+        "INFO YarnClientImpl: Submitted application 
application_1700000000000_0001",
+        "INFO Client: Application report for application_1700000000000_0001 
(state: ACCEPTED)",
+        "INFO Client: Application report for application_1700000000000_0001 
(state: RUNNING)",
+        "INFO Client: Application report for application_1700000000000_0001 
(state: FINISHED)",
+        "INFO Client: final status: SUCCEEDED",
+    ]
+
+    @staticmethod
+    def _yarn_status_output(final_state: str) -> str:
+        return (
+            "Application Report :\n"
+            "\tApplication-Id : application_1700000000000_0001\n"
+            "\tApplication-Name : test\n"
+            "\tState : FINISHED\n"
+            f"\tFinal-State : {final_state}\n"
+        )
+
+    @patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.run")
+    
@patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen")
+    def test_default_keeps_existing_behavior_in_yarn_cluster(self, mock_popen, 
mock_run):
+        """Without the flag, submit() must not call `yarn application 
-status`."""
+        proc = MagicMock()
+        proc.stdout = iter(self._YARN_LOG_LINES)
+        proc.wait.return_value = 0

Review Comment:
   New MagicMock instances are created without a `spec`/`autospec`. Using a 
spec (e.g., `spec=subprocess.Popen`) helps catch typos and mismatched APIs in 
tests.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +753,96 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> 
None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive `yarn 
application -status` "
+                        f"failures. Last error: {exc}"
+                    )
+                self.log.warning(
+                    "Transient `yarn application -status` failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _build_yarn_cli_env(self) -> dict[str, str]:
+        """
+        Build the env for invoking the ``yarn`` CLI.
+
+        Always merges the user-supplied ``env_vars`` (e.g. 
``HADOOP_CONF_DIR``).
+        When the connection has both a keytab and a principal, also renews the
+        Kerberos TGT into a ccache and exposes it via ``KRB5CCNAME`` so the CLI
+        can authenticate without piggy-backing on the spark-submit JVM. Renewal
+        failures are tolerated for the same reason as in ``on_kill``: the
+        failure may just be a non-renewable ticket and we still want to attempt
+        the CLI call.
+        """
+        env = {**os.environ, **(self._env or {})}
+        if self._connection["keytab"] is not None and 
self._connection["principal"] is not None:
+            renew_from_kt(self._connection["principal"], 
self._connection["keytab"], exit_on_fail=False)
+            env["KRB5CCNAME"] = airflow_conf.get_mandatory_value("kerberos", 
"ccache")
+        return env
+
+    def _query_yarn_application_final_status(self, application_id: str) -> str:
+        """Run ``yarn application -status <id>`` once and return the 
Final-State string."""
+        cmd = ["yarn", "application", "-status", application_id]
+        proc = subprocess.run(
+            cmd,
+            env=self._build_yarn_cli_env(),
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            text=True,
+            check=False,
+        )

Review Comment:
   `subprocess.run()` here has no timeout, so a hung/slow `yarn application 
-status` call can block the task indefinitely. Consider adding a reasonable 
`timeout` (and handling `TimeoutExpired` similarly to other transient failures) 
to keep polling resilient.
   



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +753,96 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> 
None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive `yarn 
application -status` "
+                        f"failures. Last error: {exc}"
+                    )
+                self.log.warning(
+                    "Transient `yarn application -status` failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )

Review Comment:
   This new YARN tracking path raises `RuntimeError` for application failures. 
Elsewhere in this hook `submit()` failures are surfaced as `AirflowException`, 
which is the typical exception type for operator/hook failures in Airflow. 
Consider raising `AirflowException` here as well for consistency and clearer 
task failure reporting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to