Ferdinanddb opened a new issue, #58228: URL: https://github.com/apache/airflow/issues/58228
### Apache Airflow version 3.1.2 ### If "Other Airflow 2/3 version" selected, which one? _No response_ ### What happened? Sometimes, I enter a weird state and it seems to happen in the following chain of events: - The first retry fails on Airflow side for some reason, but not on Kubernetes side where the job is running. So I guess this can be called a zombie task. - The second retry will, in my case, find that a driver pod exists matching the job, and so it will start listening to it. And the problem happens at the end when the job finish and Airflow tries to delete the pod: there is no `self.launcher` object available. We can see that [here](https://github.com/apache/airflow/blob/providers-cncf-kubernetes/10.9.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py#L300): ```python def execute(self, context: Context): self.name = self.create_job_name() self._setup_spark_configuration(context) if self.deferrable: self.execute_async(context) return super().execute(context) def _setup_spark_configuration(self, context: Context): """Set up Spark-specific configuration including reattach logic.""" import copy template_body = copy.deepcopy(self.template_body) if self.reattach_on_restart: task_context_labels = self._get_ti_pod_labels(context) existing_pod = self.find_spark_job(context) if existing_pod: self.log.info( "Found existing Spark driver pod %s. Reattaching to it.", existing_pod.metadata.name ) self.pod = existing_pod self.pod_request_obj = None return if "spark" not in template_body: template_body["spark"] = {} if "spec" not in template_body["spark"]: template_body["spark"]["spec"] = {} spec_dict = template_body["spark"]["spec"] if "labels" not in spec_dict: spec_dict["labels"] = {} spec_dict["labels"].update(task_context_labels) for component in ["driver", "executor"]: if component not in spec_dict: spec_dict[component] = {} if "labels" not in spec_dict[component]: spec_dict[component]["labels"] = {} spec_dict[component]["labels"].update(task_context_labels) self.log.info("Creating sparkApplication.") self.launcher = CustomObjectLauncher( name=self.name, namespace=self.namespace, kube_client=self.client, custom_obj_api=self.custom_obj_api, template_body=template_body, ) self.pod = self.get_or_create_spark_crd(self.launcher, context) self.pod_request_obj = self.launcher.pod_spec ``` Notice that, in my example, the second retries will enter the if block here, and so it will return early without initializing the `self.launcher` object: ```python existing_pod = self.find_spark_job(context) if existing_pod: self.log.info( "Found existing Spark driver pod %s. Reattaching to it.", existing_pod.metadata.name ) self.pod = existing_pod self.pod_request_obj = None return ``` This explain why we are missing the `self.launcher`. Here is a typical task I run that can enter this state: ```python maintenance_job = SparkKubernetesOperator( task_id="maintenance_job", namespace="spark-operator", application_file="spark_app/maintenance_job/spark_application_config.yml", kubernetes_conn_id="kubernetes_default", random_name_suffix=True, get_logs=True, reattach_on_restart=True, delete_on_termination=True, do_xcom_push=False, deferrable=False, retries=1, trigger_rule="none_failed_min_one_success", on_execute_callback=upload_spark_config_to_gcs, ) ``` ### What you think should happen instead? The job should succeed, and the pod should be deleted. ### How to reproduce Enter a state where you have a zombie task that triggered a spark job on retry number 0. Then, on retry 1, the Spark driver pod will be found and Airflow will listen to it. Once the Spark job is completed, Airflow will try to delete the pod and other objects associated to the job, but it won't be able to do it since the `self.launcher` object is missing. ### Operating System Official Airflow image: docker.io/apache/airflow:3.1.1-python3.12 ### Versions of Apache Airflow Providers See deployment details, but cncf-kubernetes provider version is 10.8.1, for info. ### Deployment Official Apache Airflow Helm Chart ### Deployment details I use helm with a custom image built via this Dockerfile: ``` FROM docker.io/apache/airflow:3.1.1-python3.12 USER root # Copy requirements to working directory COPY requirements.txt /var/airflow/requirements.txt # Set the working directory in the container WORKDIR /var/airflow USER airflow RUN pip install --upgrade pip # Install the necessary dependencies RUN pip install \ --no-cache-dir \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.1/constraints-3.12.txt" \ -r /var/airflow/requirements.txt ``` The requirements.txt file is: ``` apache-airflow[amazon,google,postgres,async,cncf.kubernetes,celery,slack,http,fab,standard,openlineage]==3.1.1 ``` ### Anything else? I opened another issue regarding the deferrable mode not working on this operator, and I believe that this issue and the other one are somehow connected in the sense that something is wrongly done in the operator. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
