andallo commented on issue #41212: URL: https://github.com/apache/airflow/issues/41212#issuecomment-2268957552
I haven't been deal with other operators, only with SparkKubernetesOperator. Let me give some notes about [PR](https://github.com/apache/airflow/pull/41255). I tested changes of custom_object_launcher.py and result isn't what I expect. First launch of the application failed with error: ``` [2024-08-05, 06:33:26 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/opt/airflow/dags/repo/custom_operators/spark_kubernetes_operator_8_3_1.py", line 314, in execute self.pod = self.get_or_create_spark_crd(self.launcher, context) File "/opt/airflow/dags/repo/custom_operators/spark_kubernetes_operator_8_3_1.py", line 250, in get_or_create_spark_crd driver_pod, spark_obj_spec = launcher.start_spark_job( File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 330, in wrapped_f return self(f, *args, **kw) File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 467, in __call__ do = self.iter(retry_state=retry_state) File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 368, in iter result = action(retry_state) File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 390, in <lambda> self._add_action_func(lambda rs: rs.outcome.result()) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 470, in __call__ result = fn(*args, **kwargs) File "/opt/airflow/dags/repo/custom_operators/custom_object_launcher.py", line 313, in start_spark_job raise e File "/opt/airflow/dags/repo/custom_operators/custom_object_launcher.py", line 300, in start_spark_job while self.spark_job_not_running(self.spark_obj_spec): File "/opt/airflow/dags/repo/custom_operators/custom_object_launcher.py", line 369, in spark_job_not_running raise AirflowException( airflow.exceptions.AirflowException: Spark Job Status/Driver State not found. Please check if the Job/Pod is running. ``` But this error failed only Airflow task (pod). Driver and spark application were continue to work. Second try started the task, because reattach_on_restart logic connected new task to the driver and CustomLauncher wasn't used. Then I tried to remove Driver pod manually and I was expected the task became failed. But again task took success status. I think you try to solve the case when Driver are deleted during the launching of application. But I described different case in this issue. I delete Driver after application is launched, when driver is running and application already returned from start_spark_job function of CustomLauncher. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
