gschwim opened a new issue #14162: URL: https://github.com/apache/airflow/issues/14162
<!-- Welcome to Apache Airflow! For a smooth issue process, try to answer the following questions. Don't worry if they're not all applicable; just try to include what you can :-) If you need to include code snippets or logs, please put them in fenced code blocks. If they're super-long, please use the details tag like <details><summary>super-long log</summary> lots of stuff </details> Please delete these comment blocks before submitting the issue. --> <!-- IMPORTANT!!! PLEASE CHECK "SIMILAR TO X EXISTING ISSUES" OPTION IF VISIBLE NEXT TO "SUBMIT NEW ISSUE" BUTTON!!! PLEASE CHECK IF THIS ISSUE HAS BEEN REPORTED PREVIOUSLY USING SEARCH!!! Please complete the next sections or the issue will be closed. These questions are the first thing we need to know to understand the context. --> **Apache Airflow version**: from the docker-compose file in the tutorial, image apache/airflow:master-python3.8 and apache/airflow:master-python3.7 **Kubernetes version (if you are using kubernetes)** (use `kubectl version`): n/a **Environment**: - **Cloud provider or hardware configuration**: Bare metal server in lab, AMD threadripper, 64GB ram - **OS** (e.g. from /etc/os-release): Ubuntu 18.04.5 LTS BIONIC VM running atop KVM - **Kernel** (e.g. `uname -a`): Linux zxdev01 4.15.0-135-generic #139-Ubuntu SMP Mon Jan 18 17:38:24 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux - **Install tools**: docker-compose as provided in airflow tutorial w/ mods as above - **Others**: set up per the tutorial [here](https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html) **What happened**: Running a version of the included example tutorial dag _taskflow_api_etl_ that is set to be on a schedule causes a crash ` docker exec -ti airflow_airflow-webserver_1 airflow dags backfill test_tutorial_taskflow_api_etl --start-date <any date> --end-date <any date> ` The following error is received: ` AttributeError: Can't pickle local object 'test_tutorial_taskflow_api_etl.<locals>.transform' ` Modifications to this dag, including removing the return values from each function and the parameters in each, yield identical results. Running a single day backfill does *not* cause this crash. Deleting the dag and letting it re-import does not help. <!-- (please include exact error messages if you can) --> **What you expected to happen**: The dag should complete the backfill for all days. I removed the return statement in all functions, and removed the parameters in each of the function definitions thinking this had to do with the passing along of return results but this did not help. I tested with multiple workers and a single worker as well as with python 3.7. The same result was achieved in all cases. <!-- What do you think went wrong? --> **How to reproduce it**: <!--- As minimally and precisely as possible. Keep in mind we do not have access to your cluster or dags. If you are using kubernetes, please attempt to recreate the issue using minikube or kind. ## Install minikube/kind - Minikube https://minikube.sigs.k8s.io/docs/start/ - Kind https://kind.sigs.k8s.io/docs/user/quick-start/ If this is a UI bug, please provide a screenshot of the bug or a link to a youtube video of the bug in action You can include images using the .md style of  To record a screencast, mac users can use QuickTime and then create an unlisted youtube video with the resulting .mov file. ---> 1. Install an ubuntu VM, versions as specified, 8GB ram, 4 cores 2. Install Docker CE from the docker repositories per their installation directions 3. Download the Airflow docker-compose image, perform the tutorial set up tasks, modify only the airflow image as noted in the versions above (this is a different bug maybe, default causes a crash). 4. `docker-compose up -d` 5. Copy the code in tutorial_taskflow_api_etl, modify to set the schedule_interval to daily, save into ./dags/test.py and let airflow discover it. 6. execute `docker exec -ti airflow_airflow-webserver_1 airflow dags backfill test_tutorial_taskflow_api_etl --start-date <any date> --end-date <any date>` **Anything else we need to know**: At least two other dag examples appear to work just fine when executed this way. Neither are using the Taskflow API. Maybe that is a contributing factor. This crash occurs consistently at every run as described. <!-- How often does this problem occur? Once? Every time etc? Any relevant logs to include? Put them here in side a detail tag: <details><summary>crash output</summary>/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py:60 PendingDeprecationWarning: --ignore-first-depends-on-past is deprecated as the value is always set to True [2021-02-10 00:48:57,661] {dagbag.py:448} INFO - Filling up the DagBag from /opt/airflow/dags [2021-02-10 00:48:57,972] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_tutorial_taskflow_api_etl', 'extract', '2021-01-04T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/etl1.py'] [2021-02-10 00:48:57,984] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_tutorial_taskflow_api_etl', 'transform', '2021-01-04T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/etl1.py'] [2021-02-10 00:48:57,996] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_tutorial_taskflow_api_etl', 'load', '2021-01-04T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/etl1.py'] Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 8, in <module> sys.exit(main()) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main args.func(args) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py", line 103, in dag_backfill dag.run( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 1706, in run job.run() File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 237, in run self._execute() File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 799, in _execute self._execute_for_run_dates( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 722, in _execute_for_run_dates processed_dag_run_dates = self._process_backfill_task_instances( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 602, in _process_backfill_task_instances executor.heartbeat() File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/base_executor.py", line 158, in heartbeat self.trigger_tasks(open_slots) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 263, in trigger_tasks self._process_tasks(task_tuples_to_send) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 272, in _process_tasks key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 331, in _send_tasks_to_celery key_and_async_results = send_pool.map( File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 771, in get raise self._value File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks put(task) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/usr/local/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'test_tutorial_taskflow_api_etl.<locals>.transform' </details> --> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
