kaxil commented on code in PR #68067:
URL: https://github.com/apache/airflow/pull/68067#discussion_r3392350145


##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -290,9 +301,23 @@ def execute(self, context: Context) -> None:
                 return self.get_job_result(driver_id, context)
         hook.submit(self.application)
 
-    def submit_job(self, context: Context) -> str:
+    def submit_job(self, context: Context) -> str | None:
         if self._hook is None:
             self._hook = self._get_hook()
+        if self._hook._is_kubernetes:
+            self._hook._conf[_K8S_WAIT_APP_COMPLETION_CONF] = "false"
+            self._hook.submit(self.application)
+            pod_name = self._hook._kubernetes_driver_pod
+            namespace = self._hook._connection["namespace"]
+            if not pod_name:
+                self.log.warning(
+                    "spark-submit did not capture a K8s driver pod name; "
+                    "crash recovery will not be available for this run"
+                )
+                return None

Review Comment:
   If the pod name isn't captured, the run doesn't continue in a degraded mode, 
it fails a moment later: the mixin still calls `poll_until_complete(None, 
...)`, and since `_kubernetes_driver_pod` is also unset, 
`_poll_k8s_driver_via_api()` raises `ValueError("K8s driver pod name not set; 
cannot poll status.")`. Meanwhile the job was already submitted and keeps 
running, so the retry submits a duplicate, which is the exact failure this 
feature is meant to prevent.
   
   The YARN branch raises right at submit for the same situation (`spark-submit 
did not produce a YARN application ID`); raising here too would fail with a 
clearer error instead of a warning that suggests the run will carry on.



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -321,12 +346,31 @@ def get_job_status(self, external_id: JsonValue, context: 
Context) -> str:
         if self._hook._is_yarn_cluster_mode:
             return self._hook.query_yarn_application_status(external_id)
         if self._hook._is_kubernetes:
-            # The K8s branches below (and in is_job_active, is_job_succeeded, 
poll_until_complete)
-            # are currently unreachable: execute_resumable is only called when 
_should_track_driver_status
-            # is True, which requires spark:// + cluster mode. They are 
scaffolding for a follow-up PR
-            # that extends ResumableJobMixin support to Kubernetes.
-            # TODO: call K8s pod status API
-            raise NotImplementedError("K8s job status not yet implemented")
+            task_store = context.get("task_store")
+            if task_store is not None:
+                cached = task_store.get(self._K8S_DRIVER_STATUS_KEY)
+                if cached:
+                    if TYPE_CHECKING:
+                        assert isinstance(cached, str)
+                    return cached
+            if kube_client is None:
+                raise RuntimeError(
+                    "apache-airflow-providers-cncf-kubernetes is required to 
query K8s pod status"
+                )
+            parts = external_id.split(":", 1)
+            if len(parts) != 2:
+                raise ValueError(
+                    f"Invalid K8s external ID format {external_id!r}; expected 
'namespace:pod_name'"
+                )
+            namespace, pod_name = parts
+            try:
+                client = kube_client.get_kube_client()
+                pod = client.read_namespaced_pod(pod_name, namespace)
+                return pod.status.phase

Review Comment:
   `pod.status.phase` can be `None` on a freshly scheduled pod, which is why 
the hook's poll loop guards with `phase = pod.status.phase or "Initializing"`. 
If that happens here, the mixin calls `is_job_active(None)` and 
`status.upper()` raises `AttributeError`.
   
   ```suggestion
                   return pod.status.phase or "Pending"
   ```
   keeps the `str` contract and maps to "still active, reconnect".



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -269,11 +279,12 @@ def execute(self, context: Context) -> None:
             self.poll_until_complete(driver_id, context)
             return self.get_job_result(driver_id, context)
         if hook._should_track_driver_via_k8s_api():
-            # TODO: Wire into execute_resumable() via ResumableJobMixin
-            # (fill submit_job / poll_until_complete K8s stubs) to enable 
crash recovery.
-            hook.submit(self.application)
-            hook._poll_k8s_driver_via_api()
-            return
+            if self.reconnect_on_retry:
+                return self.execute_resumable(context)
+            # reconnect_on_retry=False: still submit-and-poll, just skip 
task_state persistence.

Review Comment:
   This path still writes `k8s_driver_status="Succeeded"` to the task store via 
`poll_until_complete`, so persistence isn't fully skipped. Mostly cosmetic 
today, but nothing ever clears that key, so if the operator is later switched 
to `reconnect_on_retry=True`, a retry there reads the stale `"Succeeded"` 
before the live pod phase and can skip a job that is still running. Maybe clear 
or overwrite the key when submitting fresh?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to