Ferdinanddb opened a new issue, #58228:
URL: https://github.com/apache/airflow/issues/58228

   ### Apache Airflow version
   
   3.1.2
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Sometimes, I enter a weird state and it seems to happen in the following 
chain of events:
   - The first retry fails on Airflow side for some reason, but not on 
Kubernetes side where the job is running. So I guess this can be called a 
zombie task.
   - The second retry will, in my case, find that a driver pod exists matching 
the job, and so it will start listening to it. And the problem happens at the 
end when the job finish and Airflow tries to delete the pod: there is no 
`self.launcher` object available.
   
   We can see that 
[here](https://github.com/apache/airflow/blob/providers-cncf-kubernetes/10.9.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py#L300):
   
   ```python
       def execute(self, context: Context):
           self.name = self.create_job_name()
   
           self._setup_spark_configuration(context)
   
           if self.deferrable:
               self.execute_async(context)
   
           return super().execute(context)
   
       def _setup_spark_configuration(self, context: Context):
           """Set up Spark-specific configuration including reattach logic."""
           import copy
   
           template_body = copy.deepcopy(self.template_body)
   
           if self.reattach_on_restart:
               task_context_labels = self._get_ti_pod_labels(context)
   
               existing_pod = self.find_spark_job(context)
               if existing_pod:
                   self.log.info(
                       "Found existing Spark driver pod %s. Reattaching to 
it.", existing_pod.metadata.name
                   )
                   self.pod = existing_pod
                   self.pod_request_obj = None
                   return
   
               if "spark" not in template_body:
                   template_body["spark"] = {}
               if "spec" not in template_body["spark"]:
                   template_body["spark"]["spec"] = {}
   
               spec_dict = template_body["spark"]["spec"]
   
               if "labels" not in spec_dict:
                   spec_dict["labels"] = {}
               spec_dict["labels"].update(task_context_labels)
   
               for component in ["driver", "executor"]:
                   if component not in spec_dict:
                       spec_dict[component] = {}
   
                   if "labels" not in spec_dict[component]:
                       spec_dict[component]["labels"] = {}
   
                   spec_dict[component]["labels"].update(task_context_labels)
   
           self.log.info("Creating sparkApplication.")
           self.launcher = CustomObjectLauncher(
               name=self.name,
               namespace=self.namespace,
               kube_client=self.client,
               custom_obj_api=self.custom_obj_api,
               template_body=template_body,
           )
           self.pod = self.get_or_create_spark_crd(self.launcher, context)
           self.pod_request_obj = self.launcher.pod_spec
   ```
   
   Notice that, in my example, the second retries will enter the if block here, 
and so it will return early without initializing the `self.launcher` object:
   
   ```python
               existing_pod = self.find_spark_job(context)
               if existing_pod:
                   self.log.info(
                       "Found existing Spark driver pod %s. Reattaching to 
it.", existing_pod.metadata.name
                   )
                   self.pod = existing_pod
                   self.pod_request_obj = None
                   return
   ```
   
   This explain why we are missing the `self.launcher`.
   
   Here is a typical task I run that can enter this state:
   ```python
   maintenance_job = SparkKubernetesOperator(
           task_id="maintenance_job",
           namespace="spark-operator",
           
application_file="spark_app/maintenance_job/spark_application_config.yml",
           kubernetes_conn_id="kubernetes_default",
           random_name_suffix=True,
           get_logs=True,
           reattach_on_restart=True,
           delete_on_termination=True,
           do_xcom_push=False,
           deferrable=False,
           retries=1,
           trigger_rule="none_failed_min_one_success",
           on_execute_callback=upload_spark_config_to_gcs,
       )
   ```
   
   ### What you think should happen instead?
   
   The job should succeed, and the pod should be deleted.
   
   ### How to reproduce
   
   Enter a state where you have a zombie task that triggered a spark job on 
retry number 0. Then, on retry 1, the Spark driver pod will be found and 
Airflow will listen to it. Once the Spark job is completed, Airflow will try to 
delete the pod and other objects associated to the job, but it won't be able to 
do it since the `self.launcher` object is missing.
   
   ### Operating System
   
   Official Airflow image: docker.io/apache/airflow:3.1.1-python3.12
   
   ### Versions of Apache Airflow Providers
   
   See deployment details, but cncf-kubernetes provider version is 10.8.1, for 
info.
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   I use helm with a custom image built via this Dockerfile:
   ```
   FROM docker.io/apache/airflow:3.1.1-python3.12
   
   USER root
   
   # Copy requirements to working directory
   COPY requirements.txt /var/airflow/requirements.txt
   
   # Set the working directory in the container
   WORKDIR /var/airflow
   
   
   USER airflow
   
   RUN pip install --upgrade pip
   
   # Install the necessary dependencies
   RUN pip install \
       --no-cache-dir \
       --constraint 
"https://raw.githubusercontent.com/apache/airflow/constraints-3.1.1/constraints-3.12.txt";
 \
       -r /var/airflow/requirements.txt
   ```
   
   The requirements.txt file is:
   ```
   
apache-airflow[amazon,google,postgres,async,cncf.kubernetes,celery,slack,http,fab,standard,openlineage]==3.1.1
   ```
   
   ### Anything else?
   
   I opened another issue regarding the deferrable mode not working on this 
operator, and I believe that this issue and the other one are somehow connected 
in the sense that something is wrongly done in the operator.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to