gschwim opened a new issue #14162:
URL: https://github.com/apache/airflow/issues/14162


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the 
following questions.
   Don't worry if they're not all applicable; just try to include what you can 
:-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   <!--
   
   IMPORTANT!!!
   
   PLEASE CHECK "SIMILAR TO X EXISTING ISSUES" OPTION IF VISIBLE
   NEXT TO "SUBMIT NEW ISSUE" BUTTON!!!
   
   PLEASE CHECK IF THIS ISSUE HAS BEEN REPORTED PREVIOUSLY USING SEARCH!!!
   
   Please complete the next sections or the issue will be closed.
   These questions are the first thing we need to know to understand the 
context.
   
   -->
   
   **Apache Airflow version**: from the docker-compose file in the tutorial, 
image apache/airflow:master-python3.8 and apache/airflow:master-python3.7
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): n/a
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Bare metal server in lab, 
AMD threadripper, 64GB ram
   - **OS** (e.g. from /etc/os-release): Ubuntu 18.04.5 LTS BIONIC VM running 
atop KVM
   - **Kernel** (e.g. `uname -a`): Linux zxdev01 4.15.0-135-generic #139-Ubuntu 
SMP Mon Jan 18 17:38:24 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
   - **Install tools**: docker-compose as provided in airflow tutorial w/ mods 
as above
   - **Others**: set up per the tutorial 
[here](https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html)
   
   **What happened**:
   
   Running a version of the included example tutorial dag _taskflow_api_etl_ 
that is set to be on a schedule causes a crash
   
   `
   docker exec -ti airflow_airflow-webserver_1 airflow dags backfill 
test_tutorial_taskflow_api_etl --start-date <any date> --end-date <any date>
   `
   
   The following error is received:
   
   `
   AttributeError: Can't pickle local object 
'test_tutorial_taskflow_api_etl.<locals>.transform'
   `
   
   Modifications to this dag, including removing the return values from each 
function and the parameters in each, yield identical results. Running a single 
day backfill does *not* cause this crash. Deleting the dag and letting it 
re-import does not help.
   
   <!-- (please include exact error messages if you can) -->
   
   **What you expected to happen**:
   The dag should complete the backfill for all days.  I removed the return 
statement in all functions, and removed the parameters in each of the function 
definitions thinking this had to do with the passing along of return results 
but this did not help.
   
   I tested with multiple workers and a single worker as well as with python 
3.7. The same result was achieved in all cases. 
   
   <!-- What do you think went wrong? -->
   
   
   **How to reproduce it**:
   <!---
   
   As minimally and precisely as possible. Keep in mind we do not have access 
to your cluster or dags.
   
   If you are using kubernetes, please attempt to recreate the issue using 
minikube or kind.
   
   ## Install minikube/kind
   
   - Minikube https://minikube.sigs.k8s.io/docs/start/
   - Kind https://kind.sigs.k8s.io/docs/user/quick-start/
   
   If this is a UI bug, please provide a screenshot of the bug or a link to a 
youtube video of the bug in action
   
   You can include images using the .md style of
   ![alt text](http://url/to/img.png)
   
   To record a screencast, mac users can use QuickTime and then create an 
unlisted youtube video with the resulting .mov file.
   
   --->
   1. Install an ubuntu VM, versions as specified, 8GB ram, 4 cores
   2. Install Docker CE from the docker repositories per their installation 
directions
   3. Download the Airflow docker-compose image, perform the tutorial set up 
tasks, modify only the airflow image as noted in the versions above (this is a 
different bug maybe, default causes a crash).
   4. `docker-compose up -d`
   5. Copy the code in tutorial_taskflow_api_etl, modify to set the 
schedule_interval to daily, save into ./dags/test.py and let airflow discover 
it.
   6. execute `docker exec -ti airflow_airflow-webserver_1 airflow dags 
backfill test_tutorial_taskflow_api_etl --start-date <any date> --end-date <any 
date>`
   
   **Anything else we need to know**:
   
   At least two other dag examples appear to work just fine when executed this 
way. Neither are using the Taskflow API. Maybe that is a contributing factor.
   
   This crash occurs consistently at every run as described. 
   <!--
   
   How often does this problem occur? Once? Every time etc?
   
   Any relevant logs to include? Put them here in side a detail tag:
   <details><summary>crash 
output</summary>/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py:60
 PendingDeprecationWarning: --ignore-first-depends-on-past is deprecated as the 
value is always set to True
   [2021-02-10 00:48:57,661] {dagbag.py:448} INFO - Filling up the DagBag from 
/opt/airflow/dags
   [2021-02-10 00:48:57,972] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'test_tutorial_taskflow_api_etl', 'extract', 
'2021-01-04T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/etl1.py']
   [2021-02-10 00:48:57,984] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'test_tutorial_taskflow_api_etl', 'transform', 
'2021-01-04T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/etl1.py']
   [2021-02-10 00:48:57,996] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'test_tutorial_taskflow_api_etl', 'load', 
'2021-01-04T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/etl1.py']
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 
40, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py",
 line 103, in dag_backfill
       dag.run(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
1706, in run
       job.run()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", 
line 237, in run
       self._execute()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/backfill_job.py",
 line 799, in _execute
       self._execute_for_run_dates(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 62, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/backfill_job.py",
 line 722, in _execute_for_run_dates
       processed_dag_run_dates = self._process_backfill_task_instances(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 62, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/backfill_job.py",
 line 602, in _process_backfill_task_instances
       executor.heartbeat()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/base_executor.py",
 line 158, in heartbeat
       self.trigger_tasks(open_slots)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 263, in trigger_tasks
       self._process_tasks(task_tuples_to_send)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 272, in _process_tasks
       key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 331, in _send_tasks_to_celery
       key_and_async_results = send_pool.map(
     File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 364, in map
       return self._map_async(func, iterable, mapstar, chunksize).get()
     File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 771, in get
       raise self._value
     File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 537, in 
_handle_tasks
       put(task)
     File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206, 
in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.8/multiprocessing/reduction.py", line 51, in 
dumps
       cls(buf, protocol).dump(obj)
   AttributeError: Can't pickle local object 
'test_tutorial_taskflow_api_etl.<locals>.transform'
   </details>
   
   -->
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to