nailo2c commented on code in PR #67473:
URL: https://github.com/apache/airflow/pull/67473#discussion_r3366419825


##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -362,9 +381,10 @@ def poll_until_complete(self, external_id: JsonValue, 
context: Context) -> None:
         external_id = cast("str", external_id)
         if self._hook is None:
             self._hook = self._get_hook()
-        if self._hook._is_yarn:
-            # TODO: poll YARN ResourceManager until app reaches terminal state
-            raise NotImplementedError("YARN poll not yet implemented")
+        if self._hook._is_yarn_cluster_mode:
+            self._hook._start_yarn_application_status_tracking(external_id)

Review Comment:
   How about add a try finally here? 
   
   If the YARN app becomes `FAILED` or `KILLED`, 
`_start_yarn_application_status_tracking()` will raise before 
`_run_post_submit_commands()` is called.
   
   Something like this?
   ```python
   if self._hook._is_yarn_cluster_mode:
       try:
           self._hook._start_yarn_application_status_tracking(external_id)
       finally:
           self._hook._run_post_submit_commands()
       return
   ```



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -1298,3 +1326,36 @@ def on_kill(self) -> None:
             self._kill_yarn_application(self._yarn_application_id)
 
         self._run_post_submit_commands()
+
+    def query_yarn_application_status(self, application_id: str) -> str:
+        """
+        Return a normalised single string status for the ResumableJobMixin 
interface.
+
+        - Active states (NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING) are 
returned as-is.
+        - Terminal states are collapsed to "SUCCEEDED" or "FAILED" with the 
following rules:
+            - FINISHED + finalStatus SUCCEEDED -> "SUCCEEDED"
+            - FINISHED + any other finalStatus -> "FAILED"
+            - FAILED or KILLED -> "FAILED"
+        """
+        state, final_status = 
self._query_yarn_application_status(application_id)
+        if state in {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING"}:
+            return state
+        if state == "FINISHED" and final_status == self._YARN_FINAL_SUCCESS:
+            return "SUCCEEDED"
+        return "FAILED"
+
+    def kill_yarn_application(self, application_id: str) -> None:

Review Comment:
   This should delegate to `_kill_yarn_application()`; o.w. 
`SparkSubmitOperator.on_kill()` uses the unauthenticated RM REST `PUT` and 
fails on Kerberized YARN. 
   
   More details are in my PR-level comment.
   
   ```python
   def kill_yarn_application(self, application_id: str) -> None:
       """Public alias for ResumableJobMixin / operator on_kill paths."""
       self._kill_yarn_application(application_id)
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to