Nataneljpwd commented on code in PR #61110:
URL: https://github.com/apache/airflow/pull/61110#discussion_r2756014608


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Pending pod should be selected.
         pending_pod = mock.MagicMock()
         pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
-        pending_pod.metadata.name = "spark-driver-pending"
+        pending_pod.metadata.name = "spark-driver-running"
         pending_pod.metadata.labels = {"try_number": "1"}
         pending_pod.status.phase = "Pending"
 
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
         mock_get_kube_client.list_namespaced_pod.return_value.items = [
-            running_pod,
             pending_pod,
+            terminating_pod,
         ]
 
         returned_pod = op.find_spark_job(context)
 
-        assert returned_pod is running_pod
+        assert returned_pod is pending_pod
+
+    def test_find_spark_job_picks_succeeded(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Succeeded pod should be selected.
+        succeeded_pod = mock.MagicMock()
+        succeeded_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
+        succeeded_pod.metadata.name = "spark-driver-running"
+        succeeded_pod.metadata.labels = {"try_number": "1"}
+        succeeded_pod.status.phase = "Succeeded"
+
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        # Failed pod should not be selected.
+        failed_pod = mock.MagicMock()
+        failed_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 1, 
tzinfo=timezone.utc)
+        failed_pod.metadata.deletion_timestamp = timezone.datetime(2025, 1, 2, 
tzinfo=timezone.utc)
+        failed_pod.metadata.name = "spark-driver-pending"
+        failed_pod.metadata.labels = {"try_number": "1"}
+        failed_pod.status.phase = "Failed"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            succeeded_pod,
+            terminating_pod,
+            failed_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is succeeded_pod
 
     def test_find_spark_job_picks_latest_pod(

Review Comment:
   done



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Pending pod should be selected.
         pending_pod = mock.MagicMock()
         pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
-        pending_pod.metadata.name = "spark-driver-pending"
+        pending_pod.metadata.name = "spark-driver-running"
         pending_pod.metadata.labels = {"try_number": "1"}
         pending_pod.status.phase = "Pending"
 
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
         mock_get_kube_client.list_namespaced_pod.return_value.items = [
-            running_pod,
             pending_pod,
+            terminating_pod,
         ]
 
         returned_pod = op.find_spark_job(context)
 
-        assert returned_pod is running_pod
+        assert returned_pod is pending_pod
+
+    def test_find_spark_job_picks_succeeded(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Succeeded pod should be selected.
+        succeeded_pod = mock.MagicMock()
+        succeeded_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
+        succeeded_pod.metadata.name = "spark-driver-running"
+        succeeded_pod.metadata.labels = {"try_number": "1"}

Review Comment:
   It does not have a deletion timestamp as it was not deleted, hence why I did 
not add it



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Pending pod should be selected.
         pending_pod = mock.MagicMock()
         pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
-        pending_pod.metadata.name = "spark-driver-pending"
+        pending_pod.metadata.name = "spark-driver-running"
         pending_pod.metadata.labels = {"try_number": "1"}
         pending_pod.status.phase = "Pending"
 
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
         mock_get_kube_client.list_namespaced_pod.return_value.items = [
-            running_pod,
             pending_pod,
+            terminating_pod,
         ]
 
         returned_pod = op.find_spark_job(context)
 
-        assert returned_pod is running_pod
+        assert returned_pod is pending_pod
+
+    def test_find_spark_job_picks_succeeded(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Succeeded pod should be selected.
+        succeeded_pod = mock.MagicMock()
+        succeeded_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
+        succeeded_pod.metadata.name = "spark-driver-running"

Review Comment:
   done



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Pending pod should be selected.
         pending_pod = mock.MagicMock()
         pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
-        pending_pod.metadata.name = "spark-driver-pending"
+        pending_pod.metadata.name = "spark-driver-running"
         pending_pod.metadata.labels = {"try_number": "1"}
         pending_pod.status.phase = "Pending"
 
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
         mock_get_kube_client.list_namespaced_pod.return_value.items = [
-            running_pod,
             pending_pod,
+            terminating_pod,
         ]
 
         returned_pod = op.find_spark_job(context)
 
-        assert returned_pod is running_pod
+        assert returned_pod is pending_pod
+
+    def test_find_spark_job_picks_succeeded(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"

Review Comment:
   Oops, done



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Pending pod should be selected.
         pending_pod = mock.MagicMock()
         pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
-        pending_pod.metadata.name = "spark-driver-pending"
+        pending_pod.metadata.name = "spark-driver-running"
         pending_pod.metadata.labels = {"try_number": "1"}
         pending_pod.status.phase = "Pending"
 
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
         mock_get_kube_client.list_namespaced_pod.return_value.items = [
-            running_pod,
             pending_pod,
+            terminating_pod,
         ]
 
         returned_pod = op.find_spark_job(context)
 
-        assert returned_pod is running_pod
+        assert returned_pod is pending_pod
+
+    def test_find_spark_job_picks_succeeded(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.

Review Comment:
   done



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Pending pod should be selected.
         pending_pod = mock.MagicMock()
         pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
-        pending_pod.metadata.name = "spark-driver-pending"
+        pending_pod.metadata.name = "spark-driver-running"
         pending_pod.metadata.labels = {"try_number": "1"}
         pending_pod.status.phase = "Pending"
 
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
         mock_get_kube_client.list_namespaced_pod.return_value.items = [
-            running_pod,
             pending_pod,
+            terminating_pod,
         ]
 
         returned_pod = op.find_spark_job(context)
 
-        assert returned_pod is running_pod
+        assert returned_pod is pending_pod
+

Review Comment:
   it is a pending pod rather than a non-terminating, it makes less room for 
interpetation, being more direct when saying we select a pending pod



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"

Review Comment:
   Renamed, though, same as above, it remained as pending pod



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
         running_pod.metadata.labels = {"try_number": "1"}
         running_pod.status.phase = "Running"
 
-        # Pending pod should not be selected.
+        # Terminating pod should not be selected.
+        terminating_pod = mock.MagicMock()
+        terminating_pod.metadata.creation_timestamp = timezone.datetime(2025, 
1, 1, tzinfo=timezone.utc)
+        terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025, 
1, 2, tzinfo=timezone.utc)
+        terminating_pod.metadata.name = "spark-driver-pending"
+        terminating_pod.metadata.labels = {"try_number": "1"}
+        terminating_pod.status.phase = "Running"
+
+        mock_get_kube_client.list_namespaced_pod.return_value.items = [
+            running_pod,
+            terminating_pod,
+        ]
+
+        returned_pod = op.find_spark_job(context)
+
+        assert returned_pod is running_pod
+
+    def test_find_spark_job_picks_pending_pod(
+        self,
+        mock_is_in_cluster,
+        mock_parent_execute,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        """
+        Verifies that find_spark_job picks a Running Spark driver pod over a 
non-Running pod.
+        """
+
+        task_name = "test_find_spark_job_prefers_running_pod"
+        job_spec = 
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+        mock_create_job_name.return_value = task_name
+        op = SparkKubernetesOperator(
+            template_spec=job_spec,
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id=task_name,
+            get_logs=True,
+            reattach_on_restart=True,
+        )
+        context = create_context(op)
+
+        # Pending pod should be selected.
         pending_pod = mock.MagicMock()
         pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1, 
1, tzinfo=timezone.utc)
-        pending_pod.metadata.name = "spark-driver-pending"
+        pending_pod.metadata.name = "spark-driver-running"

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to