lafrinte opened a new issue #11451:
URL: https://github.com/apache/airflow/issues/11451
1. version info:
* apache-airflow: 1.10.12
* os: centos7.4
* python: 3.7.2
* apache-airflow executor: dask
2. error msg:
```
-bash-4.2$ airflow scheduler
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2020-07-16 18:48:30,885] {__init__.py:51} INFO - Using executor DaskExecutor
[2020-07-16 18:48:30,890] {scheduler_job.py:1346} INFO - Starting the
scheduler
[2020-07-16 18:48:30,890] {scheduler_job.py:1354} INFO - Running execute
loop for -1 seconds
[2020-07-16 18:48:30,890] {scheduler_job.py:1355} INFO - Processing each
file at most -1 times
[2020-07-16 18:48:30,890] {scheduler_job.py:1358} INFO - Searching for files
in /ansdep/opsapi/dags
[2020-07-16 18:48:30,890] {scheduler_job.py:1360} INFO - There are 0 files
in /ansdep/opsapi/dags
[2020-07-16 18:48:30,898] {scheduler_job.py:1411} INFO - Resetting orphaned
tasks for active dag runs
[2020-07-16 18:48:30,912] {dag_processing.py:556} INFO - Launched
DagFileProcessorManager with pid: 70385
[2020-07-16 18:48:30,916] {settings.py:54} INFO - Configured default
timezone <Timezone [Asia/Shanghai]>
[2020-07-16 18:49:22,984] {scheduler_job.py:951} INFO - 1 tasks up for
execution:
<TaskInstance:
bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16
10:49:00+00:00 [scheduled]>
[2020-07-16 18:49:22,990] {scheduler_job.py:982} INFO - Figuring out tasks
to run in Pool(name=default_pool) with 128 open slots and 1 task instances
ready to be queued
[2020-07-16 18:49:22,990] {scheduler_job.py:1010} INFO - DAG
bc081861-e948-4e59-92fa-fc14f4330eb2 has 0/512 running and queued tasks
[2020-07-16 18:49:22,994] {scheduler_job.py:1060} INFO - Setting the
following tasks to queued state:
<TaskInstance:
bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16
10:49:00+00:00 [scheduled]>
[2020-07-16 18:49:23,000] {scheduler_job.py:1134} INFO - Setting the
following 1 tasks to queued state:
<TaskInstance:
bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16
10:49:00+00:00 [queued]>
[2020-07-16 18:49:23,000] {scheduler_job.py:1170} INFO - Sending
('bc081861-e948-4e59-92fa-fc14f4330eb2', 'first_for_flush_state',
datetime.datetime(2020, 7, 16, 10, 49, tzinfo=<TimezoneInfo [UTC, GMT,
+00:00:00, STD]>), 1) to executor with priority 4 and queue default
[2020-07-16 18:49:23,000] {base_executor.py:58} INFO - Adding to queue:
['airflow', 'run', 'bc081861-e948-4e59-92fa-fc14f4330eb2',
'first_for_flush_state', '2020-07-16T10:49:00+00:00', '--local', '--pool',
'default_pool', '-sd',
'/ansdep/opsapi/dags/bc081861-e948-4e59-92fa-fc14f4330eb2.py']
/ansdep/python3/lib/python3.7/site-packages/airflow/executors/dask_executor.py:63:
UserWarning: DaskExecutor does not support queues. All tasks will be run in
the same cluster
'DaskExecutor does not support queues. '
distributed.protocol.pickle - INFO - Failed to serialize <function
DaskExecutor.execute_async.<locals>.airflow_run at 0x7f748ce808c8>. Exception:
Cell is empty
[2020-07-16 18:49:23,003] {scheduler_job.py:1384} ERROR - Exception when
executing execute_helper
Traceback (most recent call last):
File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py",
line 3323, in dumps_function
result = cache_dumps[func]
File "/ansdep/python3/lib/python3.7/site-packages/distributed/utils.py",
line 1549, in __getitem__
value = super().__getitem__(key)
File "/ansdep/python3/lib/python3.7/collections/__init__.py", line 1025,
in __getitem__
raise KeyError(key)
KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at
0x7f748ce808c8>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/ansdep/python3/lib/python3.7/site-packages/distributed/protocol/pickle.py",
line 41, in dumps
result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object
'DaskExecutor.execute_async.<locals>.airflow_run'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1382, in _execute
self._execute_helper()
File
"/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1443, in _execute_helper
if not
self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File
"/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1505, in _validate_and_run_task_instances
self.executor.heartbeat()
File
"/ansdep/python3/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 130, in heartbeat
self.trigger_tasks(open_slots)
File
"/ansdep/python3/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 154, in trigger_tasks
executor_config=simple_ti.executor_config)
File
"/ansdep/python3/lib/python3.7/site-packages/airflow/executors/dask_executor.py",
line 70, in execute_async
future = self.client.submit(airflow_run, pure=False)
File "/ansdep/python3/lib/python3.7/site-packages/distributed/client.py",
line 1579, in submit
actors=actor,
File "/ansdep/python3/lib/python3.7/site-packages/distributed/client.py",
line 2598, in _graph_to_futures
"tasks": valmap(dumps_task, dsk3),
File "/ansdep/python3/lib/python3.7/site-packages/toolz/dicttoolz.py",
line 83, in valmap
rv.update(zip(iterkeys(d), map(func, itervalues(d))))
File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py",
line 3361, in dumps_task
return {"function": dumps_function(task[0]), "args":
warn_dumps(task[1:])}
File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py",
line 3325, in dumps_function
result = pickle.dumps(func)
File
"/ansdep/python3/lib/python3.7/site-packages/distributed/protocol/pickle.py",
line 52, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 101, in dumps
cp.dump(obj)
File
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 540, in dump
return Pickler.dump(self, obj)
File "/ansdep/python3/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 659, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/ansdep/python3/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/ansdep/python3/lib/python3.7/pickle.py", line 786, in save_tuple
save(element)
File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/ansdep/python3/lib/python3.7/pickle.py", line 771, in save_tuple
save(element)
File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/ansdep/python3/lib/python3.7/site-packages/dill/_dill.py", line
1169, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
[2020-07-16 18:49:23,014] {helpers.py:325} INFO - Sending Signals.SIGTERM to
GPID 70385
[2020-07-16 18:49:23,041] {helpers.py:291} INFO - Process
psutil.Process(pid=70385, status='terminated') (70385) terminated with exit
code 0
[2020-07-16 18:49:23,041] {helpers.py:291} INFO - Process
psutil.Process(pid=70575, status='terminated') (70575) terminated with exit
code None
[2020-07-16 18:49:23,042] {scheduler_job.py:1387} INFO - Exited execute loop
```
the pickle error occured in distributed(dask) worker.py
* distributed.worker.dumps_function
```
def dumps_function(func):
""" Dump a function to bytes, cache functions """
try:
with _cache_lock:
result = cache_dumps[func]
except KeyError:
result = pickle.dumps(func) < -- occure err here
if len(result) < 100000:
with _cache_lock:
cache_dumps[func] = result
except TypeError: # Unhashable function
result = pickle.dumps(func)
return result
```
the argument `func` pass to dumps_function is getting from
airflow.executor.dask_executor.DaskExecutor.execute_async
```
def execute_async(self, key, command, queue=None, executor_config=None):
if queue is not None:
warnings.warn(
'DaskExecutor does not support queues. '
'All tasks will be run in the same cluster'
)
if command[0:2] != ["airflow", "run"]:
raise ValueError('The command must start with ["airflow",
"run"].')
def airflow_run():
return subprocess.check_call(command, close_fds=True)
future = self.client.submit(airflow_run, pure=False) < -- here to
pass the airflow_run func obj to dumps_function
self.futures[future] = key
```
from some reason, i try to mock for pickle a func in func in ipython
the class A is like the dumps_function which is waiting a func object to
pickle with.
the class B is like the executor
```
In [1]: import pickle, subprocess
In [2]: class A:
...: def __init__(self, func):
...: self.func = func
...: def dumps(self):
...: print(self.func.__name__, ' ', self.func.__module__)
...: return pickle.dumps(self.func)
...:
In [3]: class B:
...: def __init__(self):
...: self.a = None
...: def a_method(self, cmd):
...: def run_command():
...: return subprocess.check_call(cmd, close_fds=True)
...: self.a = A(run_command)
...: return self.a.dumps()
...:
...:
In [4]: b = B()
In [5]: b.a_method('sleep 100')
run_command __main__
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-18-dce0583e3bfc> in <module>
----> 1 b.a_method('sleep 100')
<ipython-input-16-7100883a1a96> in a_method(self, cmd)
6 return subprocess.check_call(cmd, close_fds=True)
7 self.a = A(run_command)
----> 8 return self.a.dumps()
9
10
<ipython-input-15-073409fdb9f1> in dumps(self)
4 def dumps(self):
5 print(self.func.__name__, ' ', self.func.__module__)
----> 6 return pickle.dumps(self.func)
7
AttributeError: Can't pickle local object 'B.a_method.<locals>.run_command'
```
i think the airflow_run func define in execute_async may case the pickle
error.
so, i change the code and scheduler work well:
```
def airflow_run(command):
return subprocess.check_call(command, close_fds=True)
class DaskExecutor(BaseExecutor):
...
def execute_async(self, key, command, queue=None, executor_config=None):
if queue is not None:
warnings.warn(
'DaskExecutor does not support queues. '
'All tasks will be run in the same cluster'
)
if command[0:2] != ["airflow", "run"]:
raise ValueError('The command must start with ["airflow",
"run"].')
future = self.client.submit(airflow_run, command, pure=False)
self.futures[future] = key
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]