Copilot commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3213029803
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +753,96 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) ->
None:
self.log.info(line)
+ def _track_yarn_application(self, application_id: str) -> None:
+ """Poll ``yarn application -status <id>`` until a final state is
reached."""
+ self.log.info(
+ "Tracking YARN application %s via 'yarn application -status'
polling",
+ application_id,
+ )
+ poll_interval = max(self._status_poll_interval, 1)
+ # Tolerate transient `yarn application -status` failures (RM hiccup,
network
+ # blip, CLI timeout) the same way `_start_driver_status_tracking` does
for
+ # spark standalone — only give up after this many consecutive failures.
+ consecutive_failures = 0
+ max_consecutive_failures = 10
+ while True:
+ self.log.debug("Polling: yarn application -status %s",
application_id)
+ try:
+ final_status =
self._query_yarn_application_final_status(application_id)
+ except RuntimeError as exc:
+ consecutive_failures += 1
+ if consecutive_failures > max_consecutive_failures:
+ raise RuntimeError(
+ f"Giving up tracking YARN application {application_id}
after "
+ f"{max_consecutive_failures} consecutive `yarn
application -status` "
+ f"failures. Last error: {exc}"
+ )
Review Comment:
The retry budget logic is off by one: you tolerate 10 consecutive failures
but raise on the 11th (`>`), while the raised message says "after 10
consecutive ... failures". Either raise when `>= max_consecutive_failures` or
update the message to reflect the actual threshold; also consider using
exception chaining (`raise ... from exc`) to preserve the original error
context.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -99,13 +99,26 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
job finishes (on both success and on_kill). Useful for cleaning up
sidecars such
as Istio (e.g. ``["curl -X POST localhost:15020/quitquitquit"]``).
Each command
is executed via the shell; failures produce a warning but do not fail
the task.
+ :param yarn_track_via_application_status: If True (and deploy_mode is YARN
cluster),
+ release the ``spark-submit`` JVM once the application has been
submitted to
+ YARN, then poll ``yarn application -status <appId>`` every
+ ``status_poll_interval`` seconds until the application reaches a final
state.
+ This frees the worker from holding the long-lived submit JVM. Requires
the
+ ``yarn`` CLI on the worker. Cluster-side driver logs should be used
after the
+ switch to polling. Defaults to ``False``.
Review Comment:
Docstring wording is confusing: `deploy_mode` is not "YARN cluster"; YARN is
the master and `deploy_mode` is `cluster`. Consider rephrasing to something
like "when master is YARN and deploy_mode is cluster" to match the actual
configuration fields.
##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py:
##########
@@ -1006,6 +1006,30 @@ def test_yarn_process_on_kill(self, mock_popen,
mock_renew_from_kt):
in mock_popen.mock_calls
)
+
@patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen")
+ def
test_yarn_process_on_kill_kills_application_after_submit_process_exits(self,
mock_popen):
+ submit_process = MagicMock()
+ submit_process.poll.return_value = 0
+
Review Comment:
New MagicMock instances are created without a `spec`/`autospec`. Using a
spec (e.g., `spec=subprocess.Popen`) helps catch typos and mismatched APIs in
tests.
##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -86,6 +86,13 @@ class SparkSubmitOperator(BaseOperator):
on keytab for Kerberos login
:param post_submit_commands: Optional list of shell commands to run after
the Spark job finishes.
Useful for cleaning up sidecars such as Istio. Failures produce a
warning but do not fail the task.
+ :param yarn_track_via_application_status: If True (and deploy_mode is YARN
cluster),
+ release the ``spark-submit`` JVM once the application has been
submitted to
+ YARN, then poll ``yarn application -status <appId>`` every
+ ``status_poll_interval`` seconds until the application reaches a final
state.
+ This frees the worker from holding the long-lived submit JVM. Requires
the
Review Comment:
Docstring wording is confusing: `deploy_mode` is not "YARN cluster"; YARN is
the master and `deploy_mode` is `cluster`. Consider rephrasing to something
like "when master is YARN and deploy_mode is cluster" to match the actual
configuration fields.
##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py:
##########
@@ -1336,3 +1360,305 @@ def
test_post_submit_commands_none_gives_empty_list(self):
"""Test that None post_submit_commands results in an empty list."""
hook = SparkSubmitHook(conn_id="")
assert hook._post_submit_commands == []
+
+ # ---------------------------------------------------------------
+ # yarn_track_via_application_status (issue #24171)
+ # ---------------------------------------------------------------
+
+ _YARN_LOG_LINES = [
+ "INFO Client: Requesting a new application from cluster with 1
NodeManagers",
+ "INFO Client: Uploading resource file:/tmp/lib.zip -> "
+
"hdfs://namenode:8020/user/root/.sparkStaging/application_1700000000000_0001/lib.zip",
+ "INFO Client: Submitting application application_1700000000000_0001 to
ResourceManager",
+ "INFO YarnClientImpl: Submitted application
application_1700000000000_0001",
+ "INFO Client: Application report for application_1700000000000_0001
(state: ACCEPTED)",
+ "INFO Client: Application report for application_1700000000000_0001
(state: RUNNING)",
+ "INFO Client: Application report for application_1700000000000_0001
(state: FINISHED)",
+ "INFO Client: final status: SUCCEEDED",
+ ]
+
+ @staticmethod
+ def _yarn_status_output(final_state: str) -> str:
+ return (
+ "Application Report :\n"
+ "\tApplication-Id : application_1700000000000_0001\n"
+ "\tApplication-Name : test\n"
+ "\tState : FINISHED\n"
+ f"\tFinal-State : {final_state}\n"
+ )
+
+ @patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.run")
+
@patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen")
+ def test_default_keeps_existing_behavior_in_yarn_cluster(self, mock_popen,
mock_run):
+ """Without the flag, submit() must not call `yarn application
-status`."""
+ proc = MagicMock()
+ proc.stdout = iter(self._YARN_LOG_LINES)
+ proc.wait.return_value = 0
Review Comment:
New MagicMock instances are created without a `spec`/`autospec`. Using a
spec (e.g., `spec=subprocess.Popen`) helps catch typos and mismatched APIs in
tests.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +753,96 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) ->
None:
self.log.info(line)
+ def _track_yarn_application(self, application_id: str) -> None:
+ """Poll ``yarn application -status <id>`` until a final state is
reached."""
+ self.log.info(
+ "Tracking YARN application %s via 'yarn application -status'
polling",
+ application_id,
+ )
+ poll_interval = max(self._status_poll_interval, 1)
+ # Tolerate transient `yarn application -status` failures (RM hiccup,
network
+ # blip, CLI timeout) the same way `_start_driver_status_tracking` does
for
+ # spark standalone — only give up after this many consecutive failures.
+ consecutive_failures = 0
+ max_consecutive_failures = 10
+ while True:
+ self.log.debug("Polling: yarn application -status %s",
application_id)
+ try:
+ final_status =
self._query_yarn_application_final_status(application_id)
+ except RuntimeError as exc:
+ consecutive_failures += 1
+ if consecutive_failures > max_consecutive_failures:
+ raise RuntimeError(
+ f"Giving up tracking YARN application {application_id}
after "
+ f"{max_consecutive_failures} consecutive `yarn
application -status` "
+ f"failures. Last error: {exc}"
+ )
+ self.log.warning(
+ "Transient `yarn application -status` failure (%d/%d): %s",
+ consecutive_failures,
+ max_consecutive_failures,
+ exc,
+ )
+ time.sleep(poll_interval)
+ continue
+ consecutive_failures = 0
+ if final_status == self._YARN_FINAL_SUCCESS:
+ self.log.info("YARN application %s finished with SUCCEEDED",
application_id)
+ return
+ if final_status in self._YARN_FINAL_FAILURES:
+ raise RuntimeError(
+ f"YARN application {application_id} ended with final
status: {final_status}"
+ )
+ time.sleep(poll_interval)
+
+ def _build_yarn_cli_env(self) -> dict[str, str]:
+ """
+ Build the env for invoking the ``yarn`` CLI.
+
+ Always merges the user-supplied ``env_vars`` (e.g.
``HADOOP_CONF_DIR``).
+ When the connection has both a keytab and a principal, also renews the
+ Kerberos TGT into a ccache and exposes it via ``KRB5CCNAME`` so the CLI
+ can authenticate without piggy-backing on the spark-submit JVM. Renewal
+ failures are tolerated for the same reason as in ``on_kill``: the
+ failure may just be a non-renewable ticket and we still want to attempt
+ the CLI call.
+ """
+ env = {**os.environ, **(self._env or {})}
+ if self._connection["keytab"] is not None and
self._connection["principal"] is not None:
+ renew_from_kt(self._connection["principal"],
self._connection["keytab"], exit_on_fail=False)
+ env["KRB5CCNAME"] = airflow_conf.get_mandatory_value("kerberos",
"ccache")
+ return env
+
+ def _query_yarn_application_final_status(self, application_id: str) -> str:
+ """Run ``yarn application -status <id>`` once and return the
Final-State string."""
+ cmd = ["yarn", "application", "-status", application_id]
+ proc = subprocess.run(
+ cmd,
+ env=self._build_yarn_cli_env(),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ check=False,
+ )
Review Comment:
`subprocess.run()` here has no timeout, so a hung/slow `yarn application
-status` call can block the task indefinitely. Consider adding a reasonable
`timeout` (and handling `TimeoutExpired` similarly to other transient failures)
to keep polling resilient.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +753,96 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) ->
None:
self.log.info(line)
+ def _track_yarn_application(self, application_id: str) -> None:
+ """Poll ``yarn application -status <id>`` until a final state is
reached."""
+ self.log.info(
+ "Tracking YARN application %s via 'yarn application -status'
polling",
+ application_id,
+ )
+ poll_interval = max(self._status_poll_interval, 1)
+ # Tolerate transient `yarn application -status` failures (RM hiccup,
network
+ # blip, CLI timeout) the same way `_start_driver_status_tracking` does
for
+ # spark standalone — only give up after this many consecutive failures.
+ consecutive_failures = 0
+ max_consecutive_failures = 10
+ while True:
+ self.log.debug("Polling: yarn application -status %s",
application_id)
+ try:
+ final_status =
self._query_yarn_application_final_status(application_id)
+ except RuntimeError as exc:
+ consecutive_failures += 1
+ if consecutive_failures > max_consecutive_failures:
+ raise RuntimeError(
+ f"Giving up tracking YARN application {application_id}
after "
+ f"{max_consecutive_failures} consecutive `yarn
application -status` "
+ f"failures. Last error: {exc}"
+ )
+ self.log.warning(
+ "Transient `yarn application -status` failure (%d/%d): %s",
+ consecutive_failures,
+ max_consecutive_failures,
+ exc,
+ )
+ time.sleep(poll_interval)
+ continue
+ consecutive_failures = 0
+ if final_status == self._YARN_FINAL_SUCCESS:
+ self.log.info("YARN application %s finished with SUCCEEDED",
application_id)
+ return
+ if final_status in self._YARN_FINAL_FAILURES:
+ raise RuntimeError(
+ f"YARN application {application_id} ended with final
status: {final_status}"
+ )
Review Comment:
This new YARN tracking path raises `RuntimeError` for application failures.
Elsewhere in this hook `submit()` failures are surfaced as `AirflowException`,
which is the typical exception type for operator/hook failures in Airflow.
Consider raising `AirflowException` here as well for consistency and clearer
task failure reporting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]