phi-friday opened a new issue, #35263:
URL: https://github.com/apache/airflow/issues/35263
### Apache Airflow version
2.7.2
### What happened
When using the docker operator and python operator together, the celery
executor throws a TypeError.
```log
[2023-10-30 16:22:36,427: INFO/ForkPoolWorker-7] Running <TaskInstance:
test_docker_task_error.no_error manual__2023-10-30T07:22:35.534710+00:00
[queued]> on host 97bf2fd65629
[2023-10-30 16:22:36,865: ERROR/ForkPoolWorker-7]
[081e6272-34c9-4f7f-8d5f-9dd490a1d663] Failed to execute task cannot pickle
'module' object.
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 155, in _execute_in_fork
args.func(args)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line
113, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py",
line 431, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py",
line 209, in _run_task_by_selected_method
return _run_task_by_local_task_job(args, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py",
line 271, in _run_task_by_local_task_job
ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 77, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line
289, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line
318, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/local_task_job_runner.py",
line 192, in _execute
self.handle_task_exit(return_code)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/local_task_job_runner.py",
line 232, in handle_task_exit
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 77, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 2748, in schedule_downstream_tasks
partial_dag = task.dag.partial_subset(
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line
2417, in partial_subset
dag.task_dict = {
^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line
2418, in <dictcomp>
t.task_id: _deepcopy_task(t)
^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line
2415, in _deepcopy_task
return copy.deepcopy(t, memo)
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/copy.py", line 153, in deepcopy
y = copier(memo)
^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 1213, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/copy.py", line 161, in deepcopy
rv = reductor(4)
^^^^^^^^^^^
TypeError: cannot pickle 'module' object
[2023-10-30 16:22:36,890: ERROR/ForkPoolWorker-7] Task
airflow.providers.celery.executors.celery_executor_utils.execute_command[081e6272-34c9-4f7f-8d5f-9dd490a1d663]
raised unexpected: AirflowException('Celery command failed on host:
97bf2fd65629 with celery_task_id 081e6272-34c9-4f7f-8d5f-9dd490a1d663')
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/trace.py", line
477, in trace_task
R = retval = fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/trace.py", line
760, in __protected_call__
return self.run(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 121, in execute_command
_execute_in_fork(command_to_exec, celery_task_id)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 136, in _execute_in_fork
raise AirflowException(msg)
airflow.exceptions.AirflowException: Celery command failed on host:
97bf2fd65629 with celery_task_id 081e6272-34c9-4f7f-8d5f-9dd490a1d663
[2023-10-30 16:22:37,265: INFO/MainProcess] Task
airflow.providers.celery.executors.celery_executor_utils.execute_command[4fb8737b-2a3d-493b-ade6-7edfd6042df3]
received
[2023-10-30 16:22:37,273: INFO/ForkPoolWorker-7]
[4fb8737b-2a3d-493b-ade6-7edfd6042df3] Executing command in Celery: ['airflow',
'tasks', 'run', 'test_docker_task_error', 'pickle_error',
'manual__2023-10-30T07:22:35.534710+00:00', '--local', '--subdir',
'DAGS_FOLDER/test_error.py']
[2023-10-30 16:22:37,355: INFO/ForkPoolWorker-7] Filling up the DagBag from
/opt/airflow/dags/test_error.py
[2023-10-30 16:22:37,599: INFO/ForkPoolWorker-7] Running <TaskInstance:
test_docker_task_error.pickle_error manual__2023-10-30T07:22:35.534710+00:00
[queued]> on host 97bf2fd65629
[2023-10-30 16:22:38,885: INFO/ForkPoolWorker-7] Task
airflow.providers.celery.executors.celery_executor_utils.execute_command[4fb8737b-2a3d-493b-ade6-7edfd6042df3]
succeeded in 1.618334080092609s: None
```
An error occurs when two operators are linked by a dependency.
### What you think should happen instead
_No response_
### How to reproduce
in apache/airflow:slim-2.7.2-3.11
```python
# test_dag.py
from __future__ import annotations
from os import environ
from airflow.decorators import dag, task
from pendulum.datetime import DateTime
from pendulum.tz import local_timezone
DEFAULT_ARGS = {
"image": "python:3.11-slim-bullseye",
"api_version": "auto",
"network_mode": "container:airflow-worker",
"docker_url": "TCP://docker-socket-proxy:2375",
"auto_remove": "force",
"mount_tmp_dir": False,
"container_name": "pickle_error_test",
"user": environ["AIRFLOW_UID"],
}
@task.python()
def no_error() -> None:
import logging
logger = logging.getLogger("airflow.task")
logger.info("in celery")
@task.docker()
def pickle_error() -> None:
import logging
logger = logging.getLogger("airflow.task")
logger.info("in docker")
@dag(
start_date=DateTime.now(local_timezone()).replace(
hour=0, minute=0, second=0, microsecond=0
),
schedule=None,
default_args=DEFAULT_ARGS | {"do_xcom_push": False},
catchup=False,
)
def test_docker_task_error() -> None:
in_celery = no_error()
in_docker = pickle_error()
# Removing the underscore will not cause an error.
_ = in_celery >> in_docker
test_docker_task_error()
```
### Operating System
Ubuntu 22.04.1 LTS
### Versions of Apache Airflow Providers
apache-airflow-providers-celery==3.3.4
apache-airflow-providers-common-sql==1.7.2
apache-airflow-providers-docker==3.7.5
apache-airflow-providers-ftp==3.5.2
apache-airflow-providers-http==4.5.2
apache-airflow-providers-imap==3.3.2
apache-airflow-providers-odbc==4.0.0
apache-airflow-providers-postgres==5.6.1
apache-airflow-providers-redis==3.3.2
apache-airflow-providers-sqlite==3.4.3
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else
A pickle error will be thrown in executor, but the operator itself will run
and exit normally(state: SUCCESS).
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]