kaxil commented on code in PR #67715:
URL: https://github.com/apache/airflow/pull/67715#discussion_r3325823484
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
f"returncode = {returncode}"
)
+ def _poll_k8s_driver_via_api(self) -> None:
+ """Poll the K8s driver pod phase until it reaches a terminal state."""
+ pod_name = self._kubernetes_driver_pod
+ namespace = self._connection["namespace"]
+ app_id = self._kubernetes_application_id or pod_name
+
+ if not pod_name:
+ raise ValueError("K8s driver pod name not set; cannot poll
status.")
+
+ client = kube_client.get_kube_client(in_cluster=False)
Review Comment:
Hardcoding `in_cluster=False` forces `config.load_kube_config()`, which
needs a kubeconfig at `~/.kube/config`. When Airflow workers run inside the
cluster (KubernetesExecutor, Airflow-on-K8s) that file doesn't exist, so this
poll loop can't authenticate. `get_kube_client()` with no arg resolves
`in_cluster` from `[kubernetes_executor] in_cluster`. Drop the arg and let it
resolve from config, or read it from the Spark connection extra, so both in-
and out-of-cluster workers are covered.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -865,7 +968,7 @@ def on_kill(self) -> None:
try:
import kubernetes
- client = kube_client.get_kube_client()
+ client = kube_client.get_kube_client(in_cluster=False)
Review Comment:
Same issue here, and this one is a regression: `on_kill` previously called
`get_kube_client()` (resolving `in_cluster` from config). Forcing `False`
breaks driver-pod cleanup on kill for in-cluster deployments. Suggest reverting
to `get_kube_client()`.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
f"returncode = {returncode}"
)
+ def _poll_k8s_driver_via_api(self) -> None:
+ """Poll the K8s driver pod phase until it reaches a terminal state."""
+ pod_name = self._kubernetes_driver_pod
+ namespace = self._connection["namespace"]
+ app_id = self._kubernetes_application_id or pod_name
+
+ if not pod_name:
+ raise ValueError("K8s driver pod name not set; cannot poll
status.")
+
+ client = kube_client.get_kube_client(in_cluster=False)
+ poll_interval = max(self._status_poll_interval, 20)
+ # similar to `missed_job_status_reports` tolerance in
`_start_driver_status_tracking`:
+ # tolerate transient `Unknown` phases (node temporarily unreachable)
before giving up.
+ consecutive_unknown = 0
+ max_consecutive_unknown = 3
+
+ while True:
+ pod = client.read_namespaced_pod(pod_name, namespace)
Review Comment:
`read_namespaced_pod` is called bare inside `while True`. For the
long-running jobs this feature targets, a transient `ApiException` (network
blip, API token refresh, a brief 404 right after submit) crashes the loop and
fails the task while the Spark job is still healthy.
`_start_driver_status_tracking` handles exactly this with its
`missed_job_status_reports` / `max_missed_job_status_reports` budget. Wrap the
read in the same retry tolerance.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
f"returncode = {returncode}"
)
+ def _poll_k8s_driver_via_api(self) -> None:
+ """Poll the K8s driver pod phase until it reaches a terminal state."""
+ pod_name = self._kubernetes_driver_pod
+ namespace = self._connection["namespace"]
+ app_id = self._kubernetes_application_id or pod_name
+
+ if not pod_name:
+ raise ValueError("K8s driver pod name not set; cannot poll
status.")
+
+ client = kube_client.get_kube_client(in_cluster=False)
+ poll_interval = max(self._status_poll_interval, 20)
+ # similar to `missed_job_status_reports` tolerance in
`_start_driver_status_tracking`:
+ # tolerate transient `Unknown` phases (node temporarily unreachable)
before giving up.
+ consecutive_unknown = 0
+ max_consecutive_unknown = 3
+
+ while True:
Review Comment:
This loop only exits on `Succeeded`, `Failed`, or repeated `Unknown`. A pod
stuck in `Pending` (unschedulable, an ImagePullBackOff that never terminates)
polls forever and relies on `execution_timeout` being set. Consider a max
wall-clock bound or surfacing prolonged `Pending`.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
f"returncode = {returncode}"
)
+ def _poll_k8s_driver_via_api(self) -> None:
+ """Poll the K8s driver pod phase until it reaches a terminal state."""
+ pod_name = self._kubernetes_driver_pod
+ namespace = self._connection["namespace"]
+ app_id = self._kubernetes_application_id or pod_name
+
+ if not pod_name:
+ raise ValueError("K8s driver pod name not set; cannot poll
status.")
+
+ client = kube_client.get_kube_client(in_cluster=False)
+ poll_interval = max(self._status_poll_interval, 20)
+ # similar to `missed_job_status_reports` tolerance in
`_start_driver_status_tracking`:
+ # tolerate transient `Unknown` phases (node temporarily unreachable)
before giving up.
+ consecutive_unknown = 0
+ max_consecutive_unknown = 3
+
+ while True:
+ pod = client.read_namespaced_pod(pod_name, namespace)
+ phase = pod.status.phase or "Initializing"
+ self.log.info("Application status for %s (phase: %s)", app_id,
phase)
+ if phase == "Succeeded":
+ break
+ if phase == "Failed":
+ container_state = ""
+ if pod.status.container_statuses:
+ cs = pod.status.container_statuses[0]
+ if cs.state and cs.state.terminated:
+ container_state = (
+ f" exit_code={cs.state.terminated.exit_code}
reason={cs.state.terminated.reason}"
+ )
+ raise RuntimeError(f"Spark application {app_id} failed
(phase=Failed{container_state})")
Review Comment:
`_run_post_submit_commands()` only runs on the success path below; the
`Failed` and `Unknown` branches raise before reaching it. The existing contract
(asserted by the standalone test "must be called even on driver failure") is
that post-submit cleanup runs on failure too, so Istio-style sidecars leak when
the K8s job fails. Run the post-submit commands on all terminal exits, e.g. via
`finally`.
##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -234,6 +242,13 @@ def execute(self, context: Context) -> None:
driver_id = self.submit_job(context)
self.poll_until_complete(driver_id, context)
return self.get_job_result(driver_id, context)
+ if hook._should_track_driver_via_k8s_api():
+ hook._validate_track_driver_via_k8s_api_config()
Review Comment:
`_should_track_driver_via_k8s_api()` already requires k8s + cluster, so by
the time `_validate...` runs its not-k8s and not-cluster checks can never fire.
A user who sets `track_driver_via_k8s_api=True` on a YARN/standalone connection
gets a silent no-op (falls through to plain `submit`), not the `ValueError` the
description promises -- only the `waitAppCompletion=true` check is reachable.
Call the validation unconditionally when the flag is set, before the
`_should_track...` guard, so misconfiguration is rejected loudly.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -41,6 +41,9 @@
DEFAULT_SPARK_BINARY = "spark-submit"
ALLOWED_SPARK_BINARIES = [DEFAULT_SPARK_BINARY, "spark2-submit",
"spark3-submit"]
+_K8S_WAIT_APP_COMPLETION_CONF = "spark.kubernetes.submission.waitAppCompletion"
+_K8S_DELETE_ON_TERMINATION_CONF = "spark.kubernetes.driver.deleteOnTermination"
Review Comment:
`_K8S_DELETE_ON_TERMINATION_CONF` isn't referenced anywhere. Either wire it
in (the pod-deletion behavior reads like it was meant to be configurable
through this) or drop it.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
f"returncode = {returncode}"
)
+ def _poll_k8s_driver_via_api(self) -> None:
+ """Poll the K8s driver pod phase until it reaches a terminal state."""
+ pod_name = self._kubernetes_driver_pod
+ namespace = self._connection["namespace"]
Review Comment:
`namespace` defaults to `None` when neither the connection extra nor
`spark.kubernetes.namespace` is set, so `read_namespaced_pod(pod, None)` fails
confusingly. The validation (once it's made reachable) should also require a
namespace when the flag is on.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
f"returncode = {returncode}"
)
+ def _poll_k8s_driver_via_api(self) -> None:
+ """Poll the K8s driver pod phase until it reaches a terminal state."""
+ pod_name = self._kubernetes_driver_pod
+ namespace = self._connection["namespace"]
+ app_id = self._kubernetes_application_id or pod_name
+
+ if not pod_name:
+ raise ValueError("K8s driver pod name not set; cannot poll
status.")
+
+ client = kube_client.get_kube_client(in_cluster=False)
+ poll_interval = max(self._status_poll_interval, 20)
Review Comment:
`max(self._status_poll_interval, 20)` silently bumps a user-set
`status_poll_interval=5` to 20. It's documented in the docstring, but a
one-time `log.info` when the floor is applied would avoid surprise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]