casra-developers commented on pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#issuecomment-907070123
Sorry for taking so long with the testing, I had to work on many other
projects where the due dates are approaching therefore I had to postpone the
testing of the re-based branch. Also since we have no experience with
yarn/python web backends etc. it took some time to figure out how to build the
web-assets, but webserver, scheduler and worker all are able to communicate
once more.
When testing a very simple dag, the ariflow scheduler crashes with the
following traceback:
```
distributed.protocol.pickle - INFO - Failed to serialize <function
DaskExecutor.execute_async.<locals>.airflow_run at 0x7f0ba17de8c0>. Exception:
Cell is empty
--- Logging error ---
Traceback (most recent call last):
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
line 3525, in dumps_function
result = cache_dumps[func]
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/utils.py",
line 1497, in __getitem__
value = super().__getitem__(key)
File "/usr/lib/python3.7/collections/__init__.py", line 1027, in
__getitem__
raise KeyError(key)
KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at
0x7f0ba17de8c0>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
line 49, in dumps
result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object
'DaskExecutor.execute_async.<locals>.airflow_run'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 652, in _execute
self._run_scheduler_loop()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 735, in _run_scheduler_loop
self.executor.heartbeat()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 158, in heartbeat
self.trigger_tasks(open_slots)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 188, in trigger_tasks
self.execute_async(key=key, command=command, queue=queue,
executor_config=ti.executor_config)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/dask_executor.py",
line 84, in execute_async
future = self.client.submit(airflow_run, pure=False)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
line 1596, in submit
actors=actor,
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
line 2554, in _graph_to_futures
dsk = dsk.__dask_distributed_pack__(self, keyset)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
line 959, in __dask_distributed_pack__
client_keys,
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
line 392, in __dask_distributed_pack__
dsk = toolz.valmap(dumps_task, dsk)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/toolz/dicttoolz.py",
line 83, in valmap
rv.update(zip(d.keys(), map(func, d.values())))
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
line 3563, in dumps_task
return {"function": dumps_function(task[0]), "args":
warn_dumps(task[1:])}
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
line 3527, in dumps_function
result = pickle.dumps(func, protocol=4)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
line 60, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 101, in dumps
cp.dump(obj)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 540, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 659, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dill/_dill.py",
line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.7/logging/__init__.py", line 1025, in emit
msg = self.format(record)
File "/usr/lib/python3.7/logging/__init__.py", line 869, in format
return fmt.format(record)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/log/colored_log.py",
line 95, in format
record = self._color_record_traceback(record)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/log/colored_log.py",
line 86, in _color_record_traceback
self.color(self.log_colors, record.levelname) + record.exc_text +
escape_codes['reset']
AttributeError: 'CustomTTYColoredFormatter' object has no attribute 'color'
Call stack:
File "/home/airflow/test_environments/venv/bin/airflow", line 8, in
<module>
sys.exit(main())
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/__main__.py",
line 41, in main
args.func(args)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/cli.py",
line 91, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
line 70, in scheduler
job.run()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 245, in run
self._execute()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 668, in _execute
self.log.exception("Exception when executing
SchedulerJob._run_scheduler_loop")
Message: 'Exception when executing SchedulerJob._run_scheduler_loop'
Arguments: ()
[2021-08-27 11:27:25,761] {process_utils.py:108} INFO - Sending
Signals.SIGTERM to GPID 93904
[2021-08-27 11:27:25,894] {process_utils.py:70} INFO - Process
psutil.Process(pid=93904, status='terminated', exitcode=0, started='11:27:24')
(93904) terminated with exit code 0
[2021-08-27 11:27:25,895] {scheduler_job.py:679} INFO - Exited execute loop
Traceback (most recent call last):
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
line 3525, in dumps_function
result = cache_dumps[func]
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/utils.py",
line 1497, in __getitem__
value = super().__getitem__(key)
File "/usr/lib/python3.7/collections/__init__.py", line 1027, in
__getitem__
raise KeyError(key)
KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at
0x7f0ba17de8c0>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
line 49, in dumps
result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object
'DaskExecutor.execute_async.<locals>.airflow_run'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/test_environments/venv/bin/airflow", line 8, in
<module>
sys.exit(main())
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/__main__.py",
line 41, in main
args.func(args)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/utils/cli.py",
line 91, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
line 70, in scheduler
job.run()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 245, in run
self._execute()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 652, in _execute
self._run_scheduler_loop()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 735, in _run_scheduler_loop
self.executor.heartbeat()
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 158, in heartbeat
self.trigger_tasks(open_slots)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 188, in trigger_tasks
self.execute_async(key=key, command=command, queue=queue,
executor_config=ti.executor_config)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/airflow/executors/dask_executor.py",
line 84, in execute_async
future = self.client.submit(airflow_run, pure=False)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
line 1596, in submit
actors=actor,
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/client.py",
line 2554, in _graph_to_futures
dsk = dsk.__dask_distributed_pack__(self, keyset)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
line 959, in __dask_distributed_pack__
client_keys,
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dask/highlevelgraph.py",
line 392, in __dask_distributed_pack__
dsk = toolz.valmap(dumps_task, dsk)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/toolz/dicttoolz.py",
line 83, in valmap
rv.update(zip(d.keys(), map(func, d.values())))
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
line 3563, in dumps_task
return {"function": dumps_function(task[0]), "args":
warn_dumps(task[1:])}
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/worker.py",
line 3527, in dumps_function
result = pickle.dumps(func, protocol=4)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py",
line 60, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 101, in dumps
cp.dump(obj)
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 540, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py",
line 659, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File
"/home/airflow/test_environments/venv/lib/python3.7/site-packages/dill/_dill.py",
line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
```
It seems to me that there is something wrong in the "_run_scheduler_loop"
function in scheduler_job.py. Maybe there are inconsistencies in the
data-models eventhough I've ran ```airflow db upgrade``` without any errors.
I hope this helps to resolve the hopefully trivial issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]