Thanks Bolke. Good points pickling vs external API.. Here's a bit on cloudpickle performance comparison https://github.com/cloudpipe/cloudpickle/issues/58 https://github.com/cloudpipe/cloudpickle/issues/44 https://github.com/RaRe-Technologies/gensim/issues/558 DAGs aren't normally that large to be concerned much with mem/performance? (well, at least in cases we work with)
Best regards, Ruslan On Mon, Oct 23, 2017 at 11:59 AM, Bolke de Bruin <[email protected]> wrote: > But cloudpickle looks promising. What’s the speed, mem requirements? > > Bolke > > Verstuurd vanaf mijn iPad > > > Op 23 okt. 2017 om 19:57 heeft Bolke de Bruin <[email protected]> het > volgende geschreven: > > > > The other option is to use zipped dags and have those picked up by the > workers from the api. This is less error prone than pickling (marshmallow, > cloudpickle). I have a working prototype for this, but needs to be updated > to the current airflow. > > > > Another option is to use copyreg for fields that are difficult the > serialize and apply that to the jinja2 template fields. This allows one to > pickle. > > > > But I think we should deprecate pickling all together and move over to > something external/api wise. > > > > Cheers > > Bolke > > > > Verstuurd vanaf mijn iPad > > > >> Op 23 okt. 2017 om 19:35 heeft Ruslan Dautkhanov <[email protected]> > het volgende geschreven: > >> > >> I looked over cloudpickle that Alek mentioned. Looks cool - thanks for > >> referencing that. > >> https://github.com/cloudpipe/cloudpickle > >> It could be a drop-in replacement for pickle and a low-hanging fruit to > fix > >> this issue. > >> I don't see this to be an issue storing cloudpickled objects in a > database > >> table's blob field. > >> Thanks. > >> > >> > >> -- > >> Ruslan Dautkhanov > >> > >> On Thu, Oct 12, 2017 at 2:56 PM, Maxime Beauchemin < > >> [email protected]> wrote: > >> > >>> One issue that's been standing in the way is the fact that Jinja > template > >>> objects are not pickleable. That and the fact that when people pass > objects > >>> into their DAG objects (through params, callbacks or whatever other > ways), > >>> the serialization can get tangled and pickles become gigantic. People > >>> typically don't understand the implications in that context. > >>> > >>> For now there's a workaround the Jinja template pickling issue that > limits > >>> what you can do with Jinja (you'll notice that extends and imports just > >>> won't work in a pickle/remote setting). > >>> > >>> I remember spending a few hours on trying to pickle jinja templates in > the > >>> first weeks of the project and ultimately giving up. I'm sure someone > could > >>> get that working. > >>> > >>> Here's another related question, is the database a proper transport > layer > >>> for pickles? It feels like a hack to me... > >>> > >>> Another idea that was discussed was to create a new BaseDagFetcher > >>> abstraction, along with a replacement for the current implementation: > >>> FilesystemDagFecher. Then people could write/use whatever other > >>> implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher, > >>> .... > >>> > >>> Max > >>> > >>>> On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <[email protected]> > wrote: > >>>> > >>>> That's disappointing. The promise of being able to deploy code just to > >>> the > >>>> Airflow master, and have that automatically propagated to workers, > was a > >>>> major selling point for us when we chose Airflow over its > alternatives - > >>> it > >>>> would greatly simplify our deploy tooling, and free us from having to > >>> worry > >>>> about DAG definitions getting out of sync between the master and > workers. > >>>> > >>>> Perhaps the cloudpickle library, which came out of PySpark, could help > >>>> here: https://github.com/cloudpipe/cloudpickle. It appears to be > >>>> specifically designed for shipping Python code over a network. > >>>> > >>>> Alek > >>>> > >>>> On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin < > >>>> [email protected]> wrote: > >>>> > >>>>> FYI: there's been talks on deprecating pickling altogether as it's > very > >>>>> brittle. > >>>>> > >>>>> Max > >>>>> > >>>>> On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <[email protected]> > >>>> wrote: > >>>>> > >>>>>> Can anyone help with this? Has anyone successfully used Airflow with > >>>>>> pickling turned on that can give details on their setup? > >>>>>> > >>>>>> Thanks, > >>>>>> Alek > >>>>>> > >>>>>> On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <[email protected]> > >>>> wrote: > >>>>>> > >>>>>>> Yes, everything's correctly imported - everything works fine when I > >>>> run > >>>>>>> the scheduler without pickling turned on. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Alek > >>>>>>> > >>>>>>> On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez < > >>>>>>> [email protected]> wrote: > >>>>>>> > >>>>>>>> The relevant part seems to be: > >>>>>>>> > >>>>>>>> ImportError: No module named > >>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb > >>>> a16effe386_fetch_orgmast > >>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow > >>>> run > >>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599 > >>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' > >>> returned > >>>>>>>> non-zero exit status 1 > >>>>>>>> > >>>>>>>> Did you check that your task and script `fetch_orgmast.py` are > >>>>> correctly > >>>>>>>> importing all modules that they use? > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Edgar > >>>>>>>> > >>>>>>>> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <[email protected] > >>>> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> When running the scheduler as airflow scheduler -p and the > >>> worker > >>>>> (on > >>>>>> a > >>>>>>>>> different box) as airflow worker -q foo, I get the following > >>>>> exception > >>>>>>>> in > >>>>>>>>> the worker process when triggering a DAG run manually: > >>>>>>>>> > >>>>>>>>> Traceback (most recent call last): > >>>>>>>>> File "/site/var/airflow/venv/bin/airflow", line 28, in > >>> <module> > >>>>>>>>> args.func(args) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> airflow/bin/cli.py", > >>>>>>>>> line 393, in run > >>>>>>>>> DagPickle).filter(DagPickle.id == args.pickle).first() > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/orm/query.py", > >>>>>>>>> line 2755, in first > >>>>>>>>> ret = list(self[0:1]) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/orm/query.py", > >>>>>>>>> line 2547, in __getitem__ > >>>>>>>>> return list(res) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/orm/loading.py", > >>>>>>>>> line 90, in instances > >>>>>>>>> util.raise_from_cause(err) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/util/compat.py", > >>>>>>>>> line 203, in raise_from_cause > >>>>>>>>> reraise(type(exception), exception, tb=exc_tb, cause=cause) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/orm/loading.py", > >>>>>>>>> line 75, in instances > >>>>>>>>> rows = [proc(row) for row in fetch] > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/orm/loading.py", > >>>>>>>>> line 437, in _instance > >>>>>>>>> loaded_instance, populate_existing, populators) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/orm/loading.py", > >>>>>>>>> line 498, in _populate_full > >>>>>>>>> dict_[key] = getter(row) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> sqlalchemy/sql/sqltypes.py", > >>>>>>>>> line 1540, in process > >>>>>>>>> return loads(value) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> dill/dill.py", > >>>>>>>>> line 299, in loads > >>>>>>>>> return load(file) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> dill/dill.py", > >>>>>>>>> line 288, in load > >>>>>>>>> obj = pik.load() > >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 858, in load > >>>>>>>>> dispatch[key](self) > >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1090, in > >>> load_global > >>>>>>>>> klass = self.find_class(module, name) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> dill/dill.py", > >>>>>>>>> line 445, in find_class > >>>>>>>>> return StockUnpickler.find_class(self, module, name) > >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > >>>>>>>>> __import__(module) > >>>>>>>>> ImportError: No module named > >>>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb > >>>>> a16effe386_fetch_orgmast > >>>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command > >>> 'airflow > >>>>> run > >>>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle > >>> 599 > >>>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' > >>>> returned > >>>>>>>>> non-zero exit status 1 > >>>>>>>>> [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task > >>>>>>>>> airflow.executors.celery_executor.execute_command[c73d77f5- > >>>>>>>> 963c-44c1-b633- > >>>>>>>>> dc00a752f58f] > >>>>>>>>> raised unexpected: AirflowException('Celery command failed',) > >>>>>>>>> Traceback (most recent call last): > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> celery/app/trace.py", > >>>>>>>>> line 374, in trace_task > >>>>>>>>> R = retval = fun(*args, **kwargs) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> celery/app/trace.py", > >>>>>>>>> line 629, in __protected_call__ > >>>>>>>>> return self.run(*args, **kwargs) > >>>>>>>>> File "/site/var/airflow/venv/local/ > >>> lib/python2.7/site-packages/ > >>>>>>>>> airflow/executors/celery_executor.py", > >>>>>>>>> line 62, in execute_command > >>>>>>>>> raise AirflowException('Celery command failed') > >>>>>>>>> AirflowException: Celery command failed > >>>>>>>>> > >>>>>>>>> I’m using the CeleryExecutor and using Postgres 9.6 for both > >>>> Airflow > >>>>>>>> data > >>>>>>>>> and the Celery result backend. Python version is 2.7.6. What am > >>> I > >>>>>> doing > >>>>>>>>> wrong? > >>>>>>>>> > >>>>>>>>> Alek > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> >
