casra-developers commented on pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#issuecomment-907070123


   Sorry for taking so long with the testing, I had to work on many other 
projects where the due dates are approaching therefore I had to postpone the 
testing of the re-based branch. Also since we have no experience with 
yarn/python web backends etc. it took some time to figure out how to build the 
web-assets, but webserver, scheduler and worker all are able to communicate 
once more.
   
   When testing a very simple dag, the ariflow scheduler crashes with the 
following traceback:
   ```
   distributed.protocol.pickle - INFO - Failed to serialize <function 
DaskExecutor.execute_async.<locals>.airflow_run at 0x7f0ba17de8c0>. Exception: 
Cell is empty
   --- Logging error ---
   Traceback (most recent call last):
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
 line 3525, in dumps_function
       result = cache_dumps[func]
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/utils.py",
 line 1497, in __getitem__
       value = super().__getitem__(key)
     File "/usr/lib/python3.7/collections/__init__.py", line 1027, in 
__getitem__
       raise KeyError(key)
   KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at 
0x7f0ba17de8c0>
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
 line 49, in dumps
       result = pickle.dumps(x, **dump_kwargs)
   AttributeError: Can't pickle local object 
'DaskExecutor.execute_async.<locals>.airflow_run'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 652, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 735, in _run_scheduler_loop
       self.executor.heartbeat()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 158, in heartbeat
       self.trigger_tasks(open_slots)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 188, in trigger_tasks
       self.execute_async(key=key, command=command, queue=queue, 
executor_config=ti.executor_config)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/dask_executor.py",
 line 84, in execute_async
       future = self.client.submit(airflow_run, pure=False)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
 line 1596, in submit
       actors=actor,
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
 line 2554, in _graph_to_futures
       dsk = dsk.__dask_distributed_pack__(self, keyset)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
 line 959, in __dask_distributed_pack__
       client_keys,
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
 line 392, in __dask_distributed_pack__
       dsk = toolz.valmap(dumps_task, dsk)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/toolz/dicttoolz.py",
 line 83, in valmap
       rv.update(zip(d.keys(), map(func, d.values())))
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
 line 3563, in dumps_task
       return {"function": dumps_function(task[0]), "args": 
warn_dumps(task[1:])}
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
 line 3527, in dumps_function
       result = pickle.dumps(func, protocol=4)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
 line 60, in dumps
       result = cloudpickle.dumps(x, **dump_kwargs)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 101, in dumps
       cp.dump(obj)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 540, in dump
       return Pickler.dump(self, obj)
     File "/usr/lib/python3.7/pickle.py", line 437, in dump
       self.save(obj)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 722, in save_function
       *self._dynamic_function_reduce(obj), obj=obj
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 659, in _save_reduce_pickle5
       dictitems=dictitems, obj=obj
     File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
       save(args)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
       save(element)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File "/usr/lib/python3.7/pickle.py", line 774, in save_tuple
       save(element)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dill/_dill.py",
 line 1226, in save_cell
       f = obj.cell_contents
   ValueError: Cell is empty
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/lib/python3.7/logging/__init__.py", line 1025, in emit
       msg = self.format(record)
     File "/usr/lib/python3.7/logging/__init__.py", line 869, in format
       return fmt.format(record)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/log/colored_log.py",
 line 95, in format
       record = self._color_record_traceback(record)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/log/colored_log.py",
 line 86, in _color_record_traceback
       self.color(self.log_colors, record.levelname) + record.exc_text + 
escape_codes['reset']
   AttributeError: 'CustomTTYColoredFormatter' object has no attribute 'color'
   Call stack:
     File "/home/airflow/test_environments/venv/bin/airflow", line 8, in 
<module>
       sys.exit(main())
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/__main__.py",
 line 41, in main
       args.func(args)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
 line 48, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/cli.py",
 line 91, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
 line 70, in scheduler
       job.run()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 245, in run
       self._execute()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 668, in _execute
       self.log.exception("Exception when executing 
SchedulerJob._run_scheduler_loop")
   Message: 'Exception when executing SchedulerJob._run_scheduler_loop'
   Arguments: ()
   [2021-08-27 11:27:25,761] {process_utils.py:108} INFO - Sending 
Signals.SIGTERM to GPID 93904
   [2021-08-27 11:27:25,894] {process_utils.py:70} INFO - Process 
psutil.Process(pid=93904, status='terminated', exitcode=0, started='11:27:24') 
(93904) terminated with exit code 0
   [2021-08-27 11:27:25,895] {scheduler_job.py:679} INFO - Exited execute loop
   Traceback (most recent call last):
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
 line 3525, in dumps_function
       result = cache_dumps[func]
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/utils.py",
 line 1497, in __getitem__
       value = super().__getitem__(key)
     File "/usr/lib/python3.7/collections/__init__.py", line 1027, in 
__getitem__
       raise KeyError(key)
   KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at 
0x7f0ba17de8c0>
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
 line 49, in dumps
       result = pickle.dumps(x, **dump_kwargs)
   AttributeError: Can't pickle local object 
'DaskExecutor.execute_async.<locals>.airflow_run'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/home/airflow/test_environments/venv/bin/airflow", line 8, in 
<module>
       sys.exit(main())
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/__main__.py",
 line 41, in main
       args.func(args)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
 line 48, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/cli.py",
 line 91, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
 line 70, in scheduler
       job.run()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 245, in run
       self._execute()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 652, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 735, in _run_scheduler_loop
       self.executor.heartbeat()
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 158, in heartbeat
       self.trigger_tasks(open_slots)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 188, in trigger_tasks
       self.execute_async(key=key, command=command, queue=queue, 
executor_config=ti.executor_config)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/dask_executor.py",
 line 84, in execute_async
       future = self.client.submit(airflow_run, pure=False)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
 line 1596, in submit
       actors=actor,
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
 line 2554, in _graph_to_futures
       dsk = dsk.__dask_distributed_pack__(self, keyset)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
 line 959, in __dask_distributed_pack__
       client_keys,
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
 line 392, in __dask_distributed_pack__
       dsk = toolz.valmap(dumps_task, dsk)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/toolz/dicttoolz.py",
 line 83, in valmap
       rv.update(zip(d.keys(), map(func, d.values())))
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
 line 3563, in dumps_task
       return {"function": dumps_function(task[0]), "args": 
warn_dumps(task[1:])}
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
 line 3527, in dumps_function
       result = pickle.dumps(func, protocol=4)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
 line 60, in dumps
       result = cloudpickle.dumps(x, **dump_kwargs)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 101, in dumps
       cp.dump(obj)
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 540, in dump
       return Pickler.dump(self, obj)
     File "/usr/lib/python3.7/pickle.py", line 437, in dump
       self.save(obj)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 722, in save_function
       *self._dynamic_function_reduce(obj), obj=obj
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
 line 659, in _save_reduce_pickle5
       dictitems=dictitems, obj=obj
     File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
       save(args)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
       save(element)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File "/usr/lib/python3.7/pickle.py", line 774, in save_tuple
       save(element)
     File "/usr/lib/python3.7/pickle.py", line 504, in save
       f(self, obj) # Call unbound method with explicit self
     File 
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dill/_dill.py",
 line 1226, in save_cell
       f = obj.cell_contents
   ValueError: Cell is empty
   
   ```
   It seems to me that there is something wrong in the "_run_scheduler_loop" 
function in scheduler_job.py. Maybe there are inconsistencies in the 
data-models eventhough I've ran ```airflow db upgrade``` without any errors.
   
   I hope this helps to resolve the hopefully trivial issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to