amoghrajesh commented on code in PR #67118:
URL: https://github.com/apache/airflow/pull/67118#discussion_r3308477371


##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -198,7 +226,122 @@ def execute(self, context: Context) -> None:
             self.conf = 
inject_transport_information_into_spark_properties(self.conf, context)
         if self._hook is None:
             self._hook = self._get_hook()
-        self._hook.submit(self.application)
+        hook = self._hook
+        if hook._should_track_driver_status:
+            if self.reconnect_on_retry:
+                return self.execute_resumable(context)
+            # reconnect_on_retry=False: still submit-and-poll, just skip 
task_state persistence.
+            driver_id = self.submit_job(context)
+            self.poll_until_complete(driver_id, context)
+            return self.get_job_result(driver_id, context)
+        hook.submit(self.application)
+
+    def submit_job(self, context: Context) -> str:
+        if self._hook is None:
+            self._hook = self._get_hook()
+        driver_id = self._hook.submit(self.application)
+        if not driver_id:
+            raise RuntimeError("spark-submit did not return a driver ID")
+        self.log.info("Spark driver submitted: %s", driver_id)
+        return driver_id
+
+    def get_job_status(self, external_id: JsonValue) -> str:
+        # called from submit_job which always returns a str (Spark driver IDs 
are strings)
+        external_id = cast("str", external_id)
+        if self._hook is None:
+            self._hook = self._get_hook()
+        # The YARN and K8s branches below (and in is_job_active, 
is_job_succeeded, poll_until_complete)
+        # are currently unreachable: execute_resumable is only called when 
_should_track_driver_status
+        # is True, which requires spark:// + cluster mode. They are 
scaffolding for a follow-up PR
+        # that extends ResumableJobMixin support to YARN and Kubernetes.
+        if self._hook._is_yarn:
+            # TODO: call YARN ResourceManager REST API
+            # GET http://rm:8088/ws/v1/cluster/apps/{external_id}
+            raise NotImplementedError("YARN job status not yet implemented")
+        if self._hook._is_kubernetes:
+            # TODO: call K8s pod status API
+            raise NotImplementedError("K8s job status not yet implemented")
+        scheme = self._hook._connection.get("rest_scheme", "http")
+        rest_port = self._hook._connection.get("rest_port", 6066)
+        # HA master URLs can look like spark://m1:7077,m2:7077 — try each host 
in order.
+        # The master URL port (e.g. 7077) is the RPC port — not the REST API 
port.
+        # Use rest-port connection extra to override spark.master.rest.port 
(default 6066).
+        master_urls = self._hook._connection["master"].replace("spark://", 
"").split(",")
+        last_exc: Exception = RuntimeError("No Spark masters to query")
+        for m in master_urls:
+            host = m.strip().split(":")[0]
+            url = 
f"{scheme}://{host}:{rest_port}/v1/submissions/status/{external_id}"
+            try:
+                status = self._fetch_driver_status(url, external_id)
+                return status
+            except RuntimeError:
+                raise

Review Comment:
   Good catch. I had the `RuntimeError` raise thinking "if the driver actually 
failed, do not bother trying other masters" but you are right that `success: 
false` during HA recovery looks identical to a driver failure from the REST 
response perspective, and m1 being temporarily unaware of the driver is exactly 
what HA is meant to tolerate. Removing the special case and letting all 
exceptions fall through to continue is cleaner. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to