Can anyone help with this? Has anyone successfully used Airflow with pickling turned on that can give details on their setup?
Thanks, Alek On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <[email protected]> wrote: > Yes, everything's correctly imported - everything works fine when I run > the scheduler without pickling turned on. > > Thanks, > Alek > > On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez < > [email protected]> wrote: > >> The relevant part seems to be: >> >> ImportError: No module named >> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast >> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run >> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599 >> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned >> non-zero exit status 1 >> >> Did you check that your task and script `fetch_orgmast.py` are correctly >> importing all modules that they use? >> >> Cheers, >> Edgar >> >> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <[email protected]> wrote: >> >> > Hi all, >> > >> > When running the scheduler as airflow scheduler -p and the worker (on a >> > different box) as airflow worker -q foo, I get the following exception >> in >> > the worker process when triggering a DAG run manually: >> > >> > Traceback (most recent call last): >> > File "/site/var/airflow/venv/bin/airflow", line 28, in <module> >> > args.func(args) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > airflow/bin/cli.py", >> > line 393, in run >> > DagPickle).filter(DagPickle.id == args.pickle).first() >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/orm/query.py", >> > line 2755, in first >> > ret = list(self[0:1]) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/orm/query.py", >> > line 2547, in __getitem__ >> > return list(res) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/orm/loading.py", >> > line 90, in instances >> > util.raise_from_cause(err) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/util/compat.py", >> > line 203, in raise_from_cause >> > reraise(type(exception), exception, tb=exc_tb, cause=cause) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/orm/loading.py", >> > line 75, in instances >> > rows = [proc(row) for row in fetch] >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/orm/loading.py", >> > line 437, in _instance >> > loaded_instance, populate_existing, populators) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/orm/loading.py", >> > line 498, in _populate_full >> > dict_[key] = getter(row) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > sqlalchemy/sql/sqltypes.py", >> > line 1540, in process >> > return loads(value) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > dill/dill.py", >> > line 299, in loads >> > return load(file) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > dill/dill.py", >> > line 288, in load >> > obj = pik.load() >> > File "/usr/lib/python2.7/pickle.py", line 858, in load >> > dispatch[key](self) >> > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global >> > klass = self.find_class(module, name) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > dill/dill.py", >> > line 445, in find_class >> > return StockUnpickler.find_class(self, module, name) >> > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class >> > __import__(module) >> > ImportError: No module named >> > unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast >> > [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run >> > fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599 >> > --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned >> > non-zero exit status 1 >> > [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task >> > airflow.executors.celery_executor.execute_command[c73d77f5- >> 963c-44c1-b633- >> > dc00a752f58f] >> > raised unexpected: AirflowException('Celery command failed',) >> > Traceback (most recent call last): >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > celery/app/trace.py", >> > line 374, in trace_task >> > R = retval = fun(*args, **kwargs) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > celery/app/trace.py", >> > line 629, in __protected_call__ >> > return self.run(*args, **kwargs) >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ >> > airflow/executors/celery_executor.py", >> > line 62, in execute_command >> > raise AirflowException('Celery command failed') >> > AirflowException: Celery command failed >> > >> > I’m using the CeleryExecutor and using Postgres 9.6 for both Airflow >> data >> > and the Celery result backend. Python version is 2.7.6. What am I doing >> > wrong? >> > >> > Alek >> > >> > >> > >
