This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d409ba9a275 Make the launcher a cached_property and minor change to 
improve how pod name is retrieved (#58646)
d409ba9a275 is described below

commit d409ba9a275f3cee952fda75543b6f2467157429
Author: Ferdinand de Baecque <[email protected]>
AuthorDate: Sat Nov 29 16:33:41 2025 +0100

    Make the launcher a cached_property and minor change to improve how pod 
name is retrieved (#58646)
    
    * fix(cncf-kubernetes): make the launcher a cached_property and minor 
change to improve how pod name is retrieved in spark_kubernetes.py, add unit 
tests
    
    * chore: use removesuffix to set self.name and job_name variables as 
suggested by @Nataneljpwd
---
 .../cncf/kubernetes/operators/spark_kubernetes.py  | 21 ++++-
 .../kubernetes/operators/test_spark_kubernetes.py  | 99 ++++++++++++++++++++++
 2 files changed, 117 insertions(+), 3 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index c1f92af0037..221162c4697 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -286,6 +286,16 @@ class SparkKubernetesOperator(KubernetesPodOperator):
     def custom_obj_api(self) -> CustomObjectsApi:
         return CustomObjectsApi()
 
+    @cached_property
+    def launcher(self) -> CustomObjectLauncher:
+        return CustomObjectLauncher(
+            name=self.name,
+            namespace=self.namespace,
+            kube_client=self.client,
+            custom_obj_api=self.custom_obj_api,
+            template_body=self.template_body,
+        )
+
     def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) 
-> k8s.V1Pod:
         if self.reattach_on_restart:
             driver_pod = self.find_spark_job(context)
@@ -323,6 +333,8 @@ class SparkKubernetesOperator(KubernetesPodOperator):
                 )
                 self.pod = existing_pod
                 self.pod_request_obj = None
+                if self.pod.metadata.name.endswith("-driver"):
+                    self.name = self.pod.metadata.name.removesuffix("-driver")
                 return
 
             if "spark" not in template_body:
@@ -361,9 +373,12 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         return self.find_spark_job(context, exclude_checked=exclude_checked)
 
     def on_kill(self) -> None:
-        if self.launcher:
-            self.log.debug("Deleting spark job for task %s", self.task_id)
-            self.launcher.delete_spark_job()
+        self.log.debug("Deleting spark job for task %s", self.task_id)
+        job_name = self.name
+        if self.pod and self.pod.metadata and self.pod.metadata.name:
+            if self.pod.metadata.name.endswith("-driver"):
+                job_name = self.pod.metadata.name.removesuffix("-driver")
+        self.launcher.delete_spark_job(spark_job_name=job_name)
 
     def patch_already_checked(self, pod: k8s.V1Pod, *, reraise=True):
         """Add an "already checked" annotation to ensure we don't reattach on 
retries."""
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
index 9f0180334bb..0197d906db0 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -1149,3 +1149,102 @@ def test_create_job_name_should_truncate_long_names():
     pod_name = op.create_job_name()
 
     assert pod_name == long_name[:MAX_LABEL_LEN]
+
+
+class TestSparkKubernetesLifecycle:
+    
@mock.patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.CustomObjectLauncher")
+    
@mock.patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
+    def test_launcher_access_without_execute(self, mock_hook, 
mock_launcher_cls):
+        """Test that launcher is accessible even if execute is not called 
(e.g. after deferral)."""
+        op = SparkKubernetesOperator(
+            task_id="test_task",
+            namespace="default",
+            application_file="example.yaml",
+            kubernetes_conn_id="kubernetes_default",
+        )
+
+        # Mock the template body loading since we don't have a real file
+        with mock.patch.object(SparkKubernetesOperator, 
"manage_template_specs") as mock_manage:
+            mock_manage.return_value = {"spark": {"spec": {}}}
+
+            # Access launcher
+            launcher = op.launcher
+
+            assert launcher is not None
+            assert mock_launcher_cls.called
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.CustomObjectLauncher")
+    
@mock.patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
+    def test_on_kill_works_without_execute(self, mock_hook, mock_launcher_cls):
+        """Test that on_kill works without execute being called."""
+        op = SparkKubernetesOperator(
+            task_id="test_task",
+            namespace="default",
+            application_file="example.yaml",
+            name="test-job",
+        )
+
+        mock_launcher_instance = mock_launcher_cls.return_value
+
+        with mock.patch.object(SparkKubernetesOperator, 
"manage_template_specs") as mock_manage:
+            mock_manage.return_value = {"spark": {"spec": {}}}
+
+            op.on_kill()
+
+            # Should call delete_spark_job on the launcher
+            mock_launcher_instance.delete_spark_job.assert_called_once()
+
+            # Check arguments
+            call_args = mock_launcher_instance.delete_spark_job.call_args
+            # We expect spark_job_name="test-job"
+            assert call_args.kwargs.get("spark_job_name") == "test-job"
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.CustomObjectLauncher")
+    
@mock.patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
+    
@mock.patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesPodOperator.execute")
+    def test_reattach_skips_launcher_creation_in_execute(
+        self, mock_super_execute, mock_hook, mock_launcher_cls
+    ):
+        """Test that reattach logic skips explicit launcher creation but 
property still works."""
+        op = SparkKubernetesOperator(
+            task_id="test_task",
+            namespace="default",
+            application_file="example.yaml",
+            reattach_on_restart=True,
+        )
+
+        # Mock finding an existing pod
+        mock_pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="existing-pod"))
+
+        with (
+            mock.patch.object(SparkKubernetesOperator, "find_spark_job", 
return_value=mock_pod),
+            mock.patch.object(
+                SparkKubernetesOperator, "manage_template_specs", 
return_value={"spark": {"spec": {}}}
+            ),
+            mock.patch.object(SparkKubernetesOperator, "_get_ti_pod_labels", 
return_value={}),
+        ):
+            context = {"ti": mock.MagicMock(), "run_id": "test_run"}
+
+            # Run execute
+            op.execute(context)
+
+            # Verify super().execute was called
+            mock_super_execute.assert_called_once()
+
+            # Verify launcher was NOT instantiated during execute (because we 
returned early)
+            # We can check if the mock_launcher_cls was instantiated.
+            # It should NOT be instantiated during execute because 
_setup_spark_configuration returns early.
+            # However, accessing op.launcher later WILL instantiate it.
+
+            # Reset mock to clear any previous calls (though there shouldn't 
be any)
+            mock_launcher_cls.reset_mock()
+
+            # Access launcher now
+            assert op.launcher is not None
+
+            # Now it should have been instantiated
+            mock_launcher_cls.assert_called_once()
+
+            # And verify delete works
+            op.on_kill()
+            mock_launcher_cls.return_value.delete_spark_job.assert_called()

Reply via email to