That's disappointing. The promise of being able to deploy code just to the
Airflow master, and have that automatically propagated to workers, was a
major selling point for us when we chose Airflow over its alternatives - it
would greatly simplify our deploy tooling, and free us from having to worry
about DAG definitions getting out of sync between the master and workers.

Perhaps the cloudpickle library, which came out of PySpark, could help
here: https://github.com/cloudpipe/cloudpickle. It appears to be
specifically designed for shipping Python code over a network.

Alek

On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin <
[email protected]> wrote:

> FYI: there's been talks on deprecating pickling altogether as it's very
> brittle.
>
> Max
>
> On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <[email protected]> wrote:
>
> > Can anyone help with this? Has anyone successfully used Airflow with
> > pickling turned on that can give details on their setup?
> >
> > Thanks,
> > Alek
> >
> > On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <[email protected]> wrote:
> >
> > > Yes, everything's correctly imported - everything works fine when I run
> > > the scheduler without pickling turned on.
> > >
> > > Thanks,
> > > Alek
> > >
> > > On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez <
> > > [email protected]> wrote:
> > >
> > >> The relevant part seems to be:
> > >>
> > >> ImportError: No module named
> > >> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast
> > >> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run
> > >> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
> > >> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned
> > >> non-zero exit status 1
> > >>
> > >> Did you check that your task and script `fetch_orgmast.py` are
> correctly
> > >> importing all modules that they use?
> > >>
> > >> Cheers,
> > >> Edgar
> > >>
> > >> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <[email protected]>
> > wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > When running the scheduler as airflow scheduler -p and the worker
> (on
> > a
> > >> > different box) as airflow worker -q foo, I get the following
> exception
> > >> in
> > >> > the worker process when triggering a DAG run manually:
> > >> >
> > >> > Traceback (most recent call last):
> > >> >   File "/site/var/airflow/venv/bin/airflow", line 28, in <module>
> > >> >     args.func(args)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > airflow/bin/cli.py",
> > >> > line 393, in run
> > >> >     DagPickle).filter(DagPickle.id == args.pickle).first()
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/orm/query.py",
> > >> > line 2755, in first
> > >> >     ret = list(self[0:1])
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/orm/query.py",
> > >> > line 2547, in __getitem__
> > >> >     return list(res)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/orm/loading.py",
> > >> > line 90, in instances
> > >> >     util.raise_from_cause(err)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/util/compat.py",
> > >> > line 203, in raise_from_cause
> > >> >     reraise(type(exception), exception, tb=exc_tb, cause=cause)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/orm/loading.py",
> > >> > line 75, in instances
> > >> >     rows = [proc(row) for row in fetch]
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/orm/loading.py",
> > >> > line 437, in _instance
> > >> >     loaded_instance, populate_existing, populators)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/orm/loading.py",
> > >> > line 498, in _populate_full
> > >> >     dict_[key] = getter(row)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > sqlalchemy/sql/sqltypes.py",
> > >> > line 1540, in process
> > >> >     return loads(value)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > dill/dill.py",
> > >> > line 299, in loads
> > >> >     return load(file)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > dill/dill.py",
> > >> > line 288, in load
> > >> >     obj = pik.load()
> > >> >   File "/usr/lib/python2.7/pickle.py", line 858, in load
> > >> >     dispatch[key](self)
> > >> >   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
> > >> >     klass = self.find_class(module, name)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > dill/dill.py",
> > >> > line 445, in find_class
> > >> >     return StockUnpickler.find_class(self, module, name)
> > >> >   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
> > >> >     __import__(module)
> > >> > ImportError: No module named
> > >> > unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
> a16effe386_fetch_orgmast
> > >> > [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow
> run
> > >> > fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
> > >> > --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned
> > >> > non-zero exit status 1
> > >> > [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
> > >> > airflow.executors.celery_executor.execute_command[c73d77f5-
> > >> 963c-44c1-b633-
> > >> > dc00a752f58f]
> > >> > raised unexpected: AirflowException('Celery command failed',)
> > >> > Traceback (most recent call last):
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > celery/app/trace.py",
> > >> > line 374, in trace_task
> > >> >     R = retval = fun(*args, **kwargs)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > celery/app/trace.py",
> > >> > line 629, in __protected_call__
> > >> >     return self.run(*args, **kwargs)
> > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > >> > airflow/executors/celery_executor.py",
> > >> > line 62, in execute_command
> > >> >     raise AirflowException('Celery command failed')
> > >> > AirflowException: Celery command failed
> > >> >
> > >> > I’m using the CeleryExecutor and using Postgres 9.6 for both Airflow
> > >> data
> > >> > and the Celery result backend. Python version is 2.7.6. What am I
> > doing
> > >> > wrong?
> > >> >
> > >> > Alek
> > >> > ​
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to