Hi all,

When running the scheduler as airflow scheduler -p and the worker (on a
different box) as airflow worker -q foo, I get the following exception in
the worker process when triggering a DAG run manually:

Traceback (most recent call last):
  File "/site/var/airflow/venv/bin/airflow", line 28, in <module>
    args.func(args)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py",
line 393, in run
    DagPickle).filter(DagPickle.id == args.pickle).first()
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2755, in first
    ret = list(self[0:1])
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2547, in __getitem__
    return list(res)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 90, in instances
    util.raise_from_cause(err)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 75, in instances
    rows = [proc(row) for row in fetch]
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 437, in _instance
    loaded_instance, populate_existing, populators)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 498, in _populate_full
    dict_[key] = getter(row)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py",
line 1540, in process
    return loads(value)
  File "/site/var/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 299, in loads
    return load(file)
  File "/site/var/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 288, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/site/var/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 445, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named
unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast
[2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run
fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
--local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned
non-zero exit status 1
[2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
airflow.executors.celery_executor.execute_command[c73d77f5-963c-44c1-b633-dc00a752f58f]
raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File 
"/site/var/airflow/venv/local/lib/python2.7/site-packages/airflow/executors/celery_executor.py",
line 62, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed

I’m using the CeleryExecutor and using Postgres 9.6 for both Airflow data
and the Celery result backend. Python version is 2.7.6. What am I doing
wrong?

Alek
​

Reply via email to