andallo commented on issue #41212:
URL: https://github.com/apache/airflow/issues/41212#issuecomment-2268957552

   I haven't been deal with other operators, only with SparkKubernetesOperator.
   
   Let me give some notes about 
[PR](https://github.com/apache/airflow/pull/41255). I tested changes of 
custom_object_launcher.py and result isn't what I expect. First launch of the 
application failed with error: 
   ```
   [2024-08-05, 06:33:26 UTC] {taskinstance.py:2905} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 465, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 432, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
 line 401, in wrapper
       return func(self, *args, **kwargs)
     File 
"/opt/airflow/dags/repo/custom_operators/spark_kubernetes_operator_8_3_1.py", 
line 314, in execute
       self.pod = self.get_or_create_spark_crd(self.launcher, context)
     File 
"/opt/airflow/dags/repo/custom_operators/spark_kubernetes_operator_8_3_1.py", 
line 250, in get_or_create_spark_crd
       driver_pod, spark_obj_spec = launcher.start_spark_job(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
330, in wrapped_f
       return self(f, *args, **kw)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
467, in __call__
       do = self.iter(retry_state=retry_state)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
368, in iter
       result = action(retry_state)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
390, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
470, in __call__
       result = fn(*args, **kwargs)
     File "/opt/airflow/dags/repo/custom_operators/custom_object_launcher.py", 
line 313, in start_spark_job
       raise e
     File "/opt/airflow/dags/repo/custom_operators/custom_object_launcher.py", 
line 300, in start_spark_job
       while self.spark_job_not_running(self.spark_obj_spec):
     File "/opt/airflow/dags/repo/custom_operators/custom_object_launcher.py", 
line 369, in spark_job_not_running
       raise AirflowException(
   airflow.exceptions.AirflowException: Spark Job Status/Driver State not 
found. Please check if the Job/Pod is running.
   ```
   
   But this error failed only Airflow task (pod). Driver and spark application 
were continue to work. Second try started the task, because reattach_on_restart 
logic connected new task to the driver and CustomLauncher wasn't used. Then I 
tried to remove Driver pod manually and I was expected the task became failed. 
But again task took success status. 
   
   I think you try to solve the case when Driver are deleted during the 
launching of application. But I described different case in this issue. I 
delete Driver after application is launched, when driver is running and 
application already returned from start_spark_job function of CustomLauncher.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to