lafrinte opened a new issue #11451:
URL: https://github.com/apache/airflow/issues/11451


   1. version info:
   * apache-airflow: 1.10.12
   * os: centos7.4
   * python: 3.7.2
   * apache-airflow executor: dask
   
   2. error msg:
   
   ```
   -bash-4.2$ airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2020-07-16 18:48:30,885] {__init__.py:51} INFO - Using executor DaskExecutor
   [2020-07-16 18:48:30,890] {scheduler_job.py:1346} INFO - Starting the 
scheduler
   [2020-07-16 18:48:30,890] {scheduler_job.py:1354} INFO - Running execute 
loop for -1 seconds
   [2020-07-16 18:48:30,890] {scheduler_job.py:1355} INFO - Processing each 
file at most -1 times
   [2020-07-16 18:48:30,890] {scheduler_job.py:1358} INFO - Searching for files 
in /ansdep/opsapi/dags
   [2020-07-16 18:48:30,890] {scheduler_job.py:1360} INFO - There are 0 files 
in /ansdep/opsapi/dags
   [2020-07-16 18:48:30,898] {scheduler_job.py:1411} INFO - Resetting orphaned 
tasks for active dag runs
   [2020-07-16 18:48:30,912] {dag_processing.py:556} INFO - Launched 
DagFileProcessorManager with pid: 70385
   [2020-07-16 18:48:30,916] {settings.py:54} INFO - Configured default 
timezone <Timezone [Asia/Shanghai]>
   [2020-07-16 18:49:22,984] {scheduler_job.py:951} INFO - 1 tasks up for 
execution:
           <TaskInstance: 
bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16 
10:49:00+00:00 [scheduled]>
   [2020-07-16 18:49:22,990] {scheduler_job.py:982} INFO - Figuring out tasks 
to run in Pool(name=default_pool) with 128 open slots and 1 task instances 
ready to be queued
   [2020-07-16 18:49:22,990] {scheduler_job.py:1010} INFO - DAG 
bc081861-e948-4e59-92fa-fc14f4330eb2 has 0/512 running and queued tasks
   [2020-07-16 18:49:22,994] {scheduler_job.py:1060} INFO - Setting the 
following tasks to queued state:
           <TaskInstance: 
bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16 
10:49:00+00:00 [scheduled]>
   [2020-07-16 18:49:23,000] {scheduler_job.py:1134} INFO - Setting the 
following 1 tasks to queued state:
           <TaskInstance: 
bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16 
10:49:00+00:00 [queued]>
   [2020-07-16 18:49:23,000] {scheduler_job.py:1170} INFO - Sending 
('bc081861-e948-4e59-92fa-fc14f4330eb2', 'first_for_flush_state', 
datetime.datetime(2020, 7, 16, 10, 49, tzinfo=<TimezoneInfo [UTC, GMT, 
+00:00:00, STD]>), 1) to executor with priority 4 and queue default
   [2020-07-16 18:49:23,000] {base_executor.py:58} INFO - Adding to queue: 
['airflow', 'run', 'bc081861-e948-4e59-92fa-fc14f4330eb2', 
'first_for_flush_state', '2020-07-16T10:49:00+00:00', '--local', '--pool', 
'default_pool', '-sd', 
'/ansdep/opsapi/dags/bc081861-e948-4e59-92fa-fc14f4330eb2.py']
   
/ansdep/python3/lib/python3.7/site-packages/airflow/executors/dask_executor.py:63:
 UserWarning: DaskExecutor does not support queues. All tasks will be run in 
the same cluster
     'DaskExecutor does not support queues. '
   distributed.protocol.pickle - INFO - Failed to serialize <function 
DaskExecutor.execute_async.<locals>.airflow_run at 0x7f748ce808c8>. Exception: 
Cell is empty
   [2020-07-16 18:49:23,003] {scheduler_job.py:1384} ERROR - Exception when 
executing execute_helper
   Traceback (most recent call last):
     File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py", 
line 3323, in dumps_function
       result = cache_dumps[func]
     File "/ansdep/python3/lib/python3.7/site-packages/distributed/utils.py", 
line 1549, in __getitem__
       value = super().__getitem__(key)
     File "/ansdep/python3/lib/python3.7/collections/__init__.py", line 1025, 
in __getitem__
       raise KeyError(key)
   KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at 
0x7f748ce808c8>
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/ansdep/python3/lib/python3.7/site-packages/distributed/protocol/pickle.py", 
line 41, in dumps
       result = pickle.dumps(x, **dump_kwargs)
   AttributeError: Can't pickle local object 
'DaskExecutor.execute_async.<locals>.airflow_run'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", 
line 1382, in _execute
       self._execute_helper()
     File 
"/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", 
line 1443, in _execute_helper
       if not 
self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
     File 
"/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", 
line 1505, in _validate_and_run_task_instances
       self.executor.heartbeat()
     File 
"/ansdep/python3/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 130, in heartbeat
       self.trigger_tasks(open_slots)
     File 
"/ansdep/python3/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 154, in trigger_tasks
       executor_config=simple_ti.executor_config)
     File 
"/ansdep/python3/lib/python3.7/site-packages/airflow/executors/dask_executor.py",
 line 70, in execute_async
       future = self.client.submit(airflow_run, pure=False)
     File "/ansdep/python3/lib/python3.7/site-packages/distributed/client.py", 
line 1579, in submit
       actors=actor,
     File "/ansdep/python3/lib/python3.7/site-packages/distributed/client.py", 
line 2598, in _graph_to_futures
       "tasks": valmap(dumps_task, dsk3),
     File "/ansdep/python3/lib/python3.7/site-packages/toolz/dicttoolz.py", 
line 83, in valmap
       rv.update(zip(iterkeys(d), map(func, itervalues(d))))
     File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py", 
line 3361, in dumps_task
       return {"function": dumps_function(task[0]), "args": 
warn_dumps(task[1:])}
     File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py", 
line 3325, in dumps_function
       result = pickle.dumps(func)
     File 
"/ansdep/python3/lib/python3.7/site-packages/distributed/protocol/pickle.py", 
line 52, in dumps
       result = cloudpickle.dumps(x, **dump_kwargs)
     File 
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", 
line 101, in dumps
       cp.dump(obj)
     File 
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", 
line 540, in dump
       return Pickler.dump(self, obj)
     File "/ansdep/python3/lib/python3.7/pickle.py", line 437, in dump
       self.save(obj)
     File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File 
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", 
line 722, in save_function
       *self._dynamic_function_reduce(obj), obj=obj
     File 
"/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", 
line 659, in _save_reduce_pickle5
       dictitems=dictitems, obj=obj
     File "/ansdep/python3/lib/python3.7/pickle.py", line 638, in save_reduce
       save(args)
     File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File "/ansdep/python3/lib/python3.7/pickle.py", line 786, in save_tuple
       save(element)
     File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File "/ansdep/python3/lib/python3.7/pickle.py", line 771, in save_tuple
       save(element)
     File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File "/ansdep/python3/lib/python3.7/site-packages/dill/_dill.py", line 
1169, in save_cell
       f = obj.cell_contents
   ValueError: Cell is empty
   [2020-07-16 18:49:23,014] {helpers.py:325} INFO - Sending Signals.SIGTERM to 
GPID 70385
   [2020-07-16 18:49:23,041] {helpers.py:291} INFO - Process 
psutil.Process(pid=70385, status='terminated') (70385) terminated with exit 
code 0
   [2020-07-16 18:49:23,041] {helpers.py:291} INFO - Process 
psutil.Process(pid=70575, status='terminated') (70575) terminated with exit 
code None
   [2020-07-16 18:49:23,042] {scheduler_job.py:1387} INFO - Exited execute loop
   ```
   
   the pickle error occured in distributed(dask) worker.py
   
   * distributed.worker.dumps_function
   
   ```
   def dumps_function(func):
       """ Dump a function to bytes, cache functions """
       try:
           with _cache_lock:
               result = cache_dumps[func]
       except KeyError:
           result = pickle.dumps(func)         < -- occure err here
           if len(result) < 100000:
               with _cache_lock:
                   cache_dumps[func] = result
       except TypeError:  # Unhashable function
           result = pickle.dumps(func)
       return result
   ```
   
   the argument `func` pass to dumps_function is getting from 
airflow.executor.dask_executor.DaskExecutor.execute_async
   
   ```
       def execute_async(self, key, command, queue=None, executor_config=None):
           if queue is not None:
               warnings.warn(
                   'DaskExecutor does not support queues. '
                   'All tasks will be run in the same cluster'
               )
   
           if command[0:2] != ["airflow", "run"]:
               raise ValueError('The command must start with ["airflow", 
"run"].')
   
           def airflow_run():
               return subprocess.check_call(command, close_fds=True)
   
           future = self.client.submit(airflow_run, pure=False)   < -- here to 
pass the airflow_run func obj to dumps_function
           self.futures[future] = key
   ```
   
   from some reason, i try to mock for pickle a func in func in ipython
   
   the class A is like the dumps_function which is waiting a func object to 
pickle with.
   the class B is like the executor
   
   ```
   In [1]: import pickle, subprocess
   
   In [2]: class A:
       ...:     def __init__(self, func):
       ...:         self.func = func
       ...:     def dumps(self):
       ...:         print(self.func.__name__, ' ', self.func.__module__)
       ...:         return pickle.dumps(self.func)
       ...:
   
   In [3]: class B:
       ...:     def __init__(self):
       ...:         self.a = None
       ...:     def a_method(self, cmd):
       ...:         def run_command():
       ...:             return subprocess.check_call(cmd, close_fds=True)
       ...:         self.a = A(run_command)
       ...:         return self.a.dumps()
       ...:
       ...:
   
   In [4]: b = B()
   
   In [5]: b.a_method('sleep 100')
   run_command   __main__
   ---------------------------------------------------------------------------
   AttributeError                            Traceback (most recent call last)
   <ipython-input-18-dce0583e3bfc> in <module>
   ----> 1 b.a_method('sleep 100')
   
   <ipython-input-16-7100883a1a96> in a_method(self, cmd)
         6             return subprocess.check_call(cmd, close_fds=True)
         7         self.a = A(run_command)
   ----> 8         return self.a.dumps()
         9
        10
   
   <ipython-input-15-073409fdb9f1> in dumps(self)
         4     def dumps(self):
         5         print(self.func.__name__, ' ', self.func.__module__)
   ----> 6         return pickle.dumps(self.func)
         7
   
   AttributeError: Can't pickle local object 'B.a_method.<locals>.run_command'
   ```
   
   i think the airflow_run func define in execute_async may case the pickle 
error.
   
   so, i change the code and scheduler work well:
   
   ```
   def airflow_run(command):
       return subprocess.check_call(command, close_fds=True)
   
   
   class DaskExecutor(BaseExecutor):
   ...
       def execute_async(self, key, command, queue=None, executor_config=None):
           if queue is not None:
               warnings.warn(
                   'DaskExecutor does not support queues. '
                   'All tasks will be run in the same cluster'
               )
   
           if command[0:2] != ["airflow", "run"]:
               raise ValueError('The command must start with ["airflow", 
"run"].')
   
           future = self.client.submit(airflow_run, command, pure=False) 
           
           self.futures[future] = key
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to