I looked over cloudpickle that Alek mentioned. Looks cool - thanks for referencing that. https://github.com/cloudpipe/cloudpickle It could be a drop-in replacement for pickle and a low-hanging fruit to fix this issue. I don't see this to be an issue storing cloudpickled objects in a database table's blob field. Thanks.
-- Ruslan Dautkhanov On Thu, Oct 12, 2017 at 2:56 PM, Maxime Beauchemin < [email protected]> wrote: > One issue that's been standing in the way is the fact that Jinja template > objects are not pickleable. That and the fact that when people pass objects > into their DAG objects (through params, callbacks or whatever other ways), > the serialization can get tangled and pickles become gigantic. People > typically don't understand the implications in that context. > > For now there's a workaround the Jinja template pickling issue that limits > what you can do with Jinja (you'll notice that extends and imports just > won't work in a pickle/remote setting). > > I remember spending a few hours on trying to pickle jinja templates in the > first weeks of the project and ultimately giving up. I'm sure someone could > get that working. > > Here's another related question, is the database a proper transport layer > for pickles? It feels like a hack to me... > > Another idea that was discussed was to create a new BaseDagFetcher > abstraction, along with a replacement for the current implementation: > FilesystemDagFecher. Then people could write/use whatever other > implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher, > .... > > Max > > On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <[email protected]> wrote: > > > That's disappointing. The promise of being able to deploy code just to > the > > Airflow master, and have that automatically propagated to workers, was a > > major selling point for us when we chose Airflow over its alternatives - > it > > would greatly simplify our deploy tooling, and free us from having to > worry > > about DAG definitions getting out of sync between the master and workers. > > > > Perhaps the cloudpickle library, which came out of PySpark, could help > > here: https://github.com/cloudpipe/cloudpickle. It appears to be > > specifically designed for shipping Python code over a network. > > > > Alek > > > > On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin < > > [email protected]> wrote: > > > > > FYI: there's been talks on deprecating pickling altogether as it's very > > > brittle. > > > > > > Max > > > > > > On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <[email protected]> > > wrote: > > > > > > > Can anyone help with this? Has anyone successfully used Airflow with > > > > pickling turned on that can give details on their setup? > > > > > > > > Thanks, > > > > Alek > > > > > > > > On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <[email protected]> > > wrote: > > > > > > > > > Yes, everything's correctly imported - everything works fine when I > > run > > > > > the scheduler without pickling turned on. > > > > > > > > > > Thanks, > > > > > Alek > > > > > > > > > > On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez < > > > > > [email protected]> wrote: > > > > > > > > > >> The relevant part seems to be: > > > > >> > > > > >> ImportError: No module named > > > > >> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb > > a16effe386_fetch_orgmast > > > > >> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow > > run > > > > >> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599 > > > > >> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' > returned > > > > >> non-zero exit status 1 > > > > >> > > > > >> Did you check that your task and script `fetch_orgmast.py` are > > > correctly > > > > >> importing all modules that they use? > > > > >> > > > > >> Cheers, > > > > >> Edgar > > > > >> > > > > >> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <[email protected] > > > > > > wrote: > > > > >> > > > > >> > Hi all, > > > > >> > > > > > >> > When running the scheduler as airflow scheduler -p and the > worker > > > (on > > > > a > > > > >> > different box) as airflow worker -q foo, I get the following > > > exception > > > > >> in > > > > >> > the worker process when triggering a DAG run manually: > > > > >> > > > > > >> > Traceback (most recent call last): > > > > >> > File "/site/var/airflow/venv/bin/airflow", line 28, in > <module> > > > > >> > args.func(args) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > airflow/bin/cli.py", > > > > >> > line 393, in run > > > > >> > DagPickle).filter(DagPickle.id == args.pickle).first() > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/orm/query.py", > > > > >> > line 2755, in first > > > > >> > ret = list(self[0:1]) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/orm/query.py", > > > > >> > line 2547, in __getitem__ > > > > >> > return list(res) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/orm/loading.py", > > > > >> > line 90, in instances > > > > >> > util.raise_from_cause(err) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/util/compat.py", > > > > >> > line 203, in raise_from_cause > > > > >> > reraise(type(exception), exception, tb=exc_tb, cause=cause) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/orm/loading.py", > > > > >> > line 75, in instances > > > > >> > rows = [proc(row) for row in fetch] > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/orm/loading.py", > > > > >> > line 437, in _instance > > > > >> > loaded_instance, populate_existing, populators) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/orm/loading.py", > > > > >> > line 498, in _populate_full > > > > >> > dict_[key] = getter(row) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > sqlalchemy/sql/sqltypes.py", > > > > >> > line 1540, in process > > > > >> > return loads(value) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > dill/dill.py", > > > > >> > line 299, in loads > > > > >> > return load(file) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > dill/dill.py", > > > > >> > line 288, in load > > > > >> > obj = pik.load() > > > > >> > File "/usr/lib/python2.7/pickle.py", line 858, in load > > > > >> > dispatch[key](self) > > > > >> > File "/usr/lib/python2.7/pickle.py", line 1090, in > load_global > > > > >> > klass = self.find_class(module, name) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > dill/dill.py", > > > > >> > line 445, in find_class > > > > >> > return StockUnpickler.find_class(self, module, name) > > > > >> > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > > > > >> > __import__(module) > > > > >> > ImportError: No module named > > > > >> > unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb > > > a16effe386_fetch_orgmast > > > > >> > [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command > 'airflow > > > run > > > > >> > fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle > 599 > > > > >> > --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' > > returned > > > > >> > non-zero exit status 1 > > > > >> > [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task > > > > >> > airflow.executors.celery_executor.execute_command[c73d77f5- > > > > >> 963c-44c1-b633- > > > > >> > dc00a752f58f] > > > > >> > raised unexpected: AirflowException('Celery command failed',) > > > > >> > Traceback (most recent call last): > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > celery/app/trace.py", > > > > >> > line 374, in trace_task > > > > >> > R = retval = fun(*args, **kwargs) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > celery/app/trace.py", > > > > >> > line 629, in __protected_call__ > > > > >> > return self.run(*args, **kwargs) > > > > >> > File "/site/var/airflow/venv/local/ > lib/python2.7/site-packages/ > > > > >> > airflow/executors/celery_executor.py", > > > > >> > line 62, in execute_command > > > > >> > raise AirflowException('Celery command failed') > > > > >> > AirflowException: Celery command failed > > > > >> > > > > > >> > I’m using the CeleryExecutor and using Postgres 9.6 for both > > Airflow > > > > >> data > > > > >> > and the Celery result backend. Python version is 2.7.6. What am > I > > > > doing > > > > >> > wrong? > > > > >> > > > > > >> > Alek > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >
