FYI: there's been talks on deprecating pickling altogether as it's very brittle.
Max On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <[email protected]> wrote: > Can anyone help with this? Has anyone successfully used Airflow with > pickling turned on that can give details on their setup? > > Thanks, > Alek > > On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <[email protected]> wrote: > > > Yes, everything's correctly imported - everything works fine when I run > > the scheduler without pickling turned on. > > > > Thanks, > > Alek > > > > On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez < > > [email protected]> wrote: > > > >> The relevant part seems to be: > >> > >> ImportError: No module named > >> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast > >> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run > >> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599 > >> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned > >> non-zero exit status 1 > >> > >> Did you check that your task and script `fetch_orgmast.py` are correctly > >> importing all modules that they use? > >> > >> Cheers, > >> Edgar > >> > >> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <[email protected]> > wrote: > >> > >> > Hi all, > >> > > >> > When running the scheduler as airflow scheduler -p and the worker (on > a > >> > different box) as airflow worker -q foo, I get the following exception > >> in > >> > the worker process when triggering a DAG run manually: > >> > > >> > Traceback (most recent call last): > >> > File "/site/var/airflow/venv/bin/airflow", line 28, in <module> > >> > args.func(args) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > airflow/bin/cli.py", > >> > line 393, in run > >> > DagPickle).filter(DagPickle.id == args.pickle).first() > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/orm/query.py", > >> > line 2755, in first > >> > ret = list(self[0:1]) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/orm/query.py", > >> > line 2547, in __getitem__ > >> > return list(res) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/orm/loading.py", > >> > line 90, in instances > >> > util.raise_from_cause(err) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/util/compat.py", > >> > line 203, in raise_from_cause > >> > reraise(type(exception), exception, tb=exc_tb, cause=cause) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/orm/loading.py", > >> > line 75, in instances > >> > rows = [proc(row) for row in fetch] > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/orm/loading.py", > >> > line 437, in _instance > >> > loaded_instance, populate_existing, populators) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/orm/loading.py", > >> > line 498, in _populate_full > >> > dict_[key] = getter(row) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > sqlalchemy/sql/sqltypes.py", > >> > line 1540, in process > >> > return loads(value) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > dill/dill.py", > >> > line 299, in loads > >> > return load(file) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > dill/dill.py", > >> > line 288, in load > >> > obj = pik.load() > >> > File "/usr/lib/python2.7/pickle.py", line 858, in load > >> > dispatch[key](self) > >> > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global > >> > klass = self.find_class(module, name) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > dill/dill.py", > >> > line 445, in find_class > >> > return StockUnpickler.find_class(self, module, name) > >> > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > >> > __import__(module) > >> > ImportError: No module named > >> > unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast > >> > [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run > >> > fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599 > >> > --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned > >> > non-zero exit status 1 > >> > [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task > >> > airflow.executors.celery_executor.execute_command[c73d77f5- > >> 963c-44c1-b633- > >> > dc00a752f58f] > >> > raised unexpected: AirflowException('Celery command failed',) > >> > Traceback (most recent call last): > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > celery/app/trace.py", > >> > line 374, in trace_task > >> > R = retval = fun(*args, **kwargs) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > celery/app/trace.py", > >> > line 629, in __protected_call__ > >> > return self.run(*args, **kwargs) > >> > File "/site/var/airflow/venv/local/lib/python2.7/site-packages/ > >> > airflow/executors/celery_executor.py", > >> > line 62, in execute_command > >> > raise AirflowException('Celery command failed') > >> > AirflowException: Celery command failed > >> > > >> > I’m using the CeleryExecutor and using Postgres 9.6 for both Airflow > >> data > >> > and the Celery result backend. Python version is 2.7.6. What am I > doing > >> > wrong? > >> > > >> > Alek > >> > > >> > > >> > > > > >
