Taragolis commented on code in PR #39270:
URL: https://github.com/apache/airflow/pull/39270#discussion_r1580529944


##########
airflow/operators/python.py:
##########
@@ -350,6 +343,36 @@ def get_tasks_to_skip():
         return condition
 
 
+def _load_pickle():
+    import pickle
+
+    return pickle
+
+
+def _load_dill():
+    try:
+        import dill
+    except ImportError:
+        raise AirflowException("Unable to import 'dill' make sure that it 
installed.")

Review Comment:
   Original exception do not suppress original import error
   
   ```console
   [2024-04-26, 06:14:53 UTC] {standard_task_runner.py:112} ERROR - Failed to 
execute job 7 for task cloudpickle_serializer (Unable to import 'cloudpickle' 
make sure that it installed.; 5772)
   Traceback (most recent call last):
     File "/opt/airflow/airflow/operators/python.py", line 362, in 
_load_cloudpickle
       import cloudpickle
   ModuleNotFoundError: No module named 'cloudpickle'
   During handling of the above exception, another exception occurred:
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 
105, in _start_by_fork
       ret = args.func(args, dag=self.dag)
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 476, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 253, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 335, in 
_run_raw_task
       return ti._run_raw_task(
     File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2550, in 
_run_raw_task
       self._execute_task_with_callbacks(context, test_mode, session=session)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2746, in 
_execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2770, in 
_execute_task
       return _execute_task(self, context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 477, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 440, in 
_execute_callable
       return ExecutionCallableRunner(
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
       return self.func(*args, **kwargs)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
       return_value = super().execute(context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 493, in execute
       return super().execute(context=serializable_context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 236, in execute
       return_value = self.execute_callable()
     File "/opt/airflow/airflow/operators/python.py", line 840, in 
execute_callable
       result = self._execute_python_callable_in_subprocess(python_path)
     File "/opt/airflow/airflow/operators/python.py", line 532, in 
_execute_python_callable_in_subprocess
       self._write_args(input_path)
     File "/opt/airflow/airflow/operators/python.py", line 502, in _write_args
       file.write_bytes(self.pickling_library.dumps({"args": self.op_args, 
"kwargs": self.op_kwargs}))
     File "/opt/airflow/airflow/operators/python.py", line 364, in 
_load_cloudpickle
       raise AirflowException("Unable to import 'cloudpickle' make sure that it 
installed.")
   airflow.exceptions.AirflowException: Unable to import 'cloudpickle' make 
sure that it installed.
   ```
   
   If compare to just `import cloudpickle`
   
   ```console
   [2024-04-26, 06:17:42 UTC] {standard_task_runner.py:112} ERROR - Failed to 
execute job 8 for task cloudpickle_serializer (No module named 'cloudpickle'; 
5805)
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 
105, in _start_by_fork
       ret = args.func(args, dag=self.dag)
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 476, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 253, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 335, in 
_run_raw_task
       return ti._run_raw_task(
     File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2550, in 
_run_raw_task
       self._execute_task_with_callbacks(context, test_mode, session=session)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2746, in 
_execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2770, in 
_execute_task
       return _execute_task(self, context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 477, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 440, in 
_execute_callable
       return ExecutionCallableRunner(
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
       return self.func(*args, **kwargs)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
       return_value = super().execute(context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 494, in execute
       return super().execute(context=serializable_context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 236, in execute
       return_value = self.execute_callable()
     File "/opt/airflow/airflow/operators/python.py", line 841, in 
execute_callable
       result = self._execute_python_callable_in_subprocess(python_path)
     File "/opt/airflow/airflow/operators/python.py", line 533, in 
_execute_python_callable_in_subprocess
       self._write_args(input_path)
     File "/opt/airflow/airflow/operators/python.py", line 503, in _write_args
       file.write_bytes(self.pickling_library.dumps({"args": self.op_args, 
"kwargs": self.op_kwargs}))
     File "/opt/airflow/airflow/operators/python.py", line 365, in 
_load_cloudpickle
       import cloudpickle
   ModuleNotFoundError: No module named 'cloudpickle'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to