SameerMesiah97 commented on code in PR #61110:
URL: https://github.com/apache/airflow/pull/61110#discussion_r2732769667
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -279,6 +283,9 @@ def find_spark_job(self, context, exclude_checked: bool =
True):
self.log.info("`try_number` of pod: %s",
pod.metadata.labels.get("try_number", "unknown"))
return pod
+ def _get_field_selector(self) -> str:
+ return
f"status.phase!={PodPhase.RUNNING},status.phase!={PodPhase.PENDING}"
+
Review Comment:
I believe this will exclude pending and running pods. Are you sure this was
what you wanted? Maybe you wanted to select running or pending pods but I am
not sure field selectors support OR conditions.
I think it would better not to alter the query but filter afterwards
instead. Unless, there is something I am missing here.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -248,23 +247,28 @@ def find_spark_job(self, context, exclude_checked: bool =
True):
self._build_find_pod_label_selector(context,
exclude_checked=exclude_checked)
+ ",spark-role=driver"
)
- pod_list = self.client.list_namespaced_pod(self.namespace,
label_selector=label_selector).items
+ # since we did not specify a resource version, we make sure to get the
latest data
+ # we make sure we get only running or pending pods.
+ field_selector = self._get_field_selector()
+ pod_list = self.client.list_namespaced_pod(
+ self.namespace, label_selector=label_selector,
field_selector=field_selector
+ ).items
pod = None
if len(pod_list) > 1:
# When multiple pods match the same labels, select one
deterministically,
- # preferring a Running pod, then creation time, with name as a
tie-breaker.
+ # preferring a Running or Pending pod, as if another pod was
created, it will be in either the
+ # terminating status or a terminal phase, if it is in terminating,
it will have a
+ # deletion_timestamp.
+ # pending pods need to also be selected, as what if a driver pod
just failed and a new pod is
+ # created, we do not want the task to fail.
pod = max(
pod_list,
- key=lambda p: (
- p.status.phase == PodPhase.RUNNING,
- p.metadata.creation_timestamp or
datetime.min.replace(tzinfo=timezone.utc),
- p.metadata.name or "",
- ),
+ key=lambda p: (p.metadata.deletion_timestamp is None,
p.metadata.name or ""),
)
Review Comment:
Exclusion of terminating pods is a great catch, which was previously
overlooked. But why remove the other checks? I would suggest adding this new
condition as a priority and keeping the remaining ones.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]