Hi all,
When running the scheduler as airflow scheduler -p and the worker (on a
different box) as airflow worker -q foo, I get the following exception in
the worker process when triggering a DAG run manually:
Traceback (most recent call last):
File "/site/var/airflow/venv/bin/airflow", line 28, in <module>
args.func(args)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py",
line 393, in run
DagPickle).filter(DagPickle.id == args.pickle).first()
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2755, in first
ret = list(self[0:1])
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2547, in __getitem__
return list(res)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 90, in instances
util.raise_from_cause(err)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 203, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 75, in instances
rows = [proc(row) for row in fetch]
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 437, in _instance
loaded_instance, populate_existing, populators)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 498, in _populate_full
dict_[key] = getter(row)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py",
line 1540, in process
return loads(value)
File "/site/var/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 299, in loads
return load(file)
File "/site/var/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 288, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/site/var/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 445, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named
unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast
[2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run
fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
--local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned
non-zero exit status 1
[2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
airflow.executors.celery_executor.execute_command[c73d77f5-963c-44c1-b633-dc00a752f58f]
raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
line 374, in trace_task
R = retval = fun(*args, **kwargs)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
line 629, in __protected_call__
return self.run(*args, **kwargs)
File
"/site/var/airflow/venv/local/lib/python2.7/site-packages/airflow/executors/celery_executor.py",
line 62, in execute_command
raise AirflowException('Celery command failed')
AirflowException: Celery command failed
I’m using the CeleryExecutor and using Postgres 9.6 for both Airflow data
and the Celery result backend. Python version is 2.7.6. What am I doing
wrong?
Alek