Taragolis commented on code in PR #39270:
URL: https://github.com/apache/airflow/pull/39270#discussion_r1580529944
##########
airflow/operators/python.py:
##########
@@ -350,6 +343,36 @@ def get_tasks_to_skip():
return condition
+def _load_pickle():
+ import pickle
+
+ return pickle
+
+
+def _load_dill():
+ try:
+ import dill
+ except ImportError:
+ raise AirflowException("Unable to import 'dill' make sure that it
installed.")
Review Comment:
Original exception do not suppress original import error
```console
[2024-04-26, 06:14:53 UTC] {standard_task_runner.py:112} ERROR - Failed to
execute job 7 for task cloudpickle_serializer (Unable to import 'cloudpickle'
make sure that it installed.; 5772)
Traceback (most recent call last):
File "/opt/airflow/airflow/operators/python.py", line 362, in
_load_cloudpickle
import cloudpickle
ModuleNotFoundError: No module named 'cloudpickle'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line
105, in _start_by_fork
ret = args.func(args, dag=self.dag)
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 476, in
task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 253, in
_run_task_by_selected_method
return _run_raw_task(args, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 335, in
_run_raw_task
return ti._run_raw_task(
File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 2550, in
_run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/opt/airflow/airflow/models/taskinstance.py", line 2746, in
_execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 2770, in
_execute_task
return _execute_task(self, context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 477, in
_execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 440, in
_execute_callable
return ExecutionCallableRunner(
File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
return_value = super().execute(context)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 493, in execute
return super().execute(context=serializable_context)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 236, in execute
return_value = self.execute_callable()
File "/opt/airflow/airflow/operators/python.py", line 840, in
execute_callable
result = self._execute_python_callable_in_subprocess(python_path)
File "/opt/airflow/airflow/operators/python.py", line 532, in
_execute_python_callable_in_subprocess
self._write_args(input_path)
File "/opt/airflow/airflow/operators/python.py", line 502, in _write_args
file.write_bytes(self.pickling_library.dumps({"args": self.op_args,
"kwargs": self.op_kwargs}))
File "/opt/airflow/airflow/operators/python.py", line 364, in
_load_cloudpickle
raise AirflowException("Unable to import 'cloudpickle' make sure that it
installed.")
airflow.exceptions.AirflowException: Unable to import 'cloudpickle' make
sure that it installed.
```
If compare to just `import cloudpickle`
```console
[2024-04-26, 06:17:42 UTC] {standard_task_runner.py:112} ERROR - Failed to
execute job 8 for task cloudpickle_serializer (No module named 'cloudpickle';
5805)
Traceback (most recent call last):
File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line
105, in _start_by_fork
ret = args.func(args, dag=self.dag)
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 476, in
task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 253, in
_run_task_by_selected_method
return _run_raw_task(args, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 335, in
_run_raw_task
return ti._run_raw_task(
File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 2550, in
_run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/opt/airflow/airflow/models/taskinstance.py", line 2746, in
_execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 2770, in
_execute_task
return _execute_task(self, context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 477, in
_execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 440, in
_execute_callable
return ExecutionCallableRunner(
File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
return_value = super().execute(context)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 494, in execute
return super().execute(context=serializable_context)
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 236, in execute
return_value = self.execute_callable()
File "/opt/airflow/airflow/operators/python.py", line 841, in
execute_callable
result = self._execute_python_callable_in_subprocess(python_path)
File "/opt/airflow/airflow/operators/python.py", line 533, in
_execute_python_callable_in_subprocess
self._write_args(input_path)
File "/opt/airflow/airflow/operators/python.py", line 503, in _write_args
file.write_bytes(self.pickling_library.dumps({"args": self.op_args,
"kwargs": self.op_kwargs}))
File "/opt/airflow/airflow/operators/python.py", line 365, in
_load_cloudpickle
import cloudpickle
ModuleNotFoundError: No module named 'cloudpickle'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]