But cloudpickle looks promising. What’s the speed, mem requirements? Bolke
Verstuurd vanaf mijn iPad > Op 23 okt. 2017 om 19:57 heeft Bolke de Bruin <[email protected]> het > volgende geschreven: > > The other option is to use zipped dags and have those picked up by the > workers from the api. This is less error prone than pickling (marshmallow, > cloudpickle). I have a working prototype for this, but needs to be updated to > the current airflow. > > Another option is to use copyreg for fields that are difficult the serialize > and apply that to the jinja2 template fields. This allows one to pickle. > > But I think we should deprecate pickling all together and move over to > something external/api wise. > > Cheers > Bolke > > Verstuurd vanaf mijn iPad > >> Op 23 okt. 2017 om 19:35 heeft Ruslan Dautkhanov <[email protected]> het >> volgende geschreven: >> >> I looked over cloudpickle that Alek mentioned. Looks cool - thanks for >> referencing that. >> https://github.com/cloudpipe/cloudpickle >> It could be a drop-in replacement for pickle and a low-hanging fruit to fix >> this issue. >> I don't see this to be an issue storing cloudpickled objects in a database >> table's blob field. >> Thanks. >> >> >> -- >> Ruslan Dautkhanov >> >> On Thu, Oct 12, 2017 at 2:56 PM, Maxime Beauchemin < >> [email protected]> wrote: >> >>> One issue that's been standing in the way is the fact that Jinja template >>> objects are not pickleable. That and the fact that when people pass objects >>> into their DAG objects (through params, callbacks or whatever other ways), >>> the serialization can get tangled and pickles become gigantic. People >>> typically don't understand the implications in that context. >>> >>> For now there's a workaround the Jinja template pickling issue that limits >>> what you can do with Jinja (you'll notice that extends and imports just >>> won't work in a pickle/remote setting). >>> >>> I remember spending a few hours on trying to pickle jinja templates in the >>> first weeks of the project and ultimately giving up. I'm sure someone could >>> get that working. >>> >>> Here's another related question, is the database a proper transport layer >>> for pickles? It feels like a hack to me... >>> >>> Another idea that was discussed was to create a new BaseDagFetcher >>> abstraction, along with a replacement for the current implementation: >>> FilesystemDagFecher. Then people could write/use whatever other >>> implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher, >>> .... >>> >>> Max >>> >>>> On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <[email protected]> wrote: >>>> >>>> That's disappointing. The promise of being able to deploy code just to >>> the >>>> Airflow master, and have that automatically propagated to workers, was a >>>> major selling point for us when we chose Airflow over its alternatives - >>> it >>>> would greatly simplify our deploy tooling, and free us from having to >>> worry >>>> about DAG definitions getting out of sync between the master and workers. >>>> >>>> Perhaps the cloudpickle library, which came out of PySpark, could help >>>> here: https://github.com/cloudpipe/cloudpickle. It appears to be >>>> specifically designed for shipping Python code over a network. >>>> >>>> Alek >>>> >>>> On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin < >>>> [email protected]> wrote: >>>> >>>>> FYI: there's been talks on deprecating pickling altogether as it's very >>>>> brittle. >>>>> >>>>> Max >>>>> >>>>> On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <[email protected]> >>>> wrote: >>>>> >>>>>> Can anyone help with this? Has anyone successfully used Airflow with >>>>>> pickling turned on that can give details on their setup? >>>>>> >>>>>> Thanks, >>>>>> Alek >>>>>> >>>>>> On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <[email protected]> >>>> wrote: >>>>>> >>>>>>> Yes, everything's correctly imported - everything works fine when I >>>> run >>>>>>> the scheduler without pickling turned on. >>>>>>> >>>>>>> Thanks, >>>>>>> Alek >>>>>>> >>>>>>> On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> The relevant part seems to be: >>>>>>>> >>>>>>>> ImportError: No module named >>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb >>>> a16effe386_fetch_orgmast >>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow >>>> run >>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599 >>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' >>> returned >>>>>>>> non-zero exit status 1 >>>>>>>> >>>>>>>> Did you check that your task and script `fetch_orgmast.py` are >>>>> correctly >>>>>>>> importing all modules that they use? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Edgar >>>>>>>> >>>>>>>> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <[email protected] >>>> >>>>>> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> When running the scheduler as airflow scheduler -p and the >>> worker >>>>> (on >>>>>> a >>>>>>>>> different box) as airflow worker -q foo, I get the following >>>>> exception >>>>>>>> in >>>>>>>>> the worker process when triggering a DAG run manually: >>>>>>>>> >>>>>>>>> Traceback (most recent call last): >>>>>>>>> File "/site/var/airflow/venv/bin/airflow", line 28, in >>> <module> >>>>>>>>> args.func(args) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> airflow/bin/cli.py", >>>>>>>>> line 393, in run >>>>>>>>> DagPickle).filter(DagPickle.id == args.pickle).first() >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/orm/query.py", >>>>>>>>> line 2755, in first >>>>>>>>> ret = list(self[0:1]) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/orm/query.py", >>>>>>>>> line 2547, in __getitem__ >>>>>>>>> return list(res) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/orm/loading.py", >>>>>>>>> line 90, in instances >>>>>>>>> util.raise_from_cause(err) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/util/compat.py", >>>>>>>>> line 203, in raise_from_cause >>>>>>>>> reraise(type(exception), exception, tb=exc_tb, cause=cause) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/orm/loading.py", >>>>>>>>> line 75, in instances >>>>>>>>> rows = [proc(row) for row in fetch] >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/orm/loading.py", >>>>>>>>> line 437, in _instance >>>>>>>>> loaded_instance, populate_existing, populators) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/orm/loading.py", >>>>>>>>> line 498, in _populate_full >>>>>>>>> dict_[key] = getter(row) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> sqlalchemy/sql/sqltypes.py", >>>>>>>>> line 1540, in process >>>>>>>>> return loads(value) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> dill/dill.py", >>>>>>>>> line 299, in loads >>>>>>>>> return load(file) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> dill/dill.py", >>>>>>>>> line 288, in load >>>>>>>>> obj = pik.load() >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 858, in load >>>>>>>>> dispatch[key](self) >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1090, in >>> load_global >>>>>>>>> klass = self.find_class(module, name) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> dill/dill.py", >>>>>>>>> line 445, in find_class >>>>>>>>> return StockUnpickler.find_class(self, module, name) >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1124, in find_class >>>>>>>>> __import__(module) >>>>>>>>> ImportError: No module named >>>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb >>>>> a16effe386_fetch_orgmast >>>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command >>> 'airflow >>>>> run >>>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle >>> 599 >>>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' >>>> returned >>>>>>>>> non-zero exit status 1 >>>>>>>>> [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task >>>>>>>>> airflow.executors.celery_executor.execute_command[c73d77f5- >>>>>>>> 963c-44c1-b633- >>>>>>>>> dc00a752f58f] >>>>>>>>> raised unexpected: AirflowException('Celery command failed',) >>>>>>>>> Traceback (most recent call last): >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> celery/app/trace.py", >>>>>>>>> line 374, in trace_task >>>>>>>>> R = retval = fun(*args, **kwargs) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> celery/app/trace.py", >>>>>>>>> line 629, in __protected_call__ >>>>>>>>> return self.run(*args, **kwargs) >>>>>>>>> File "/site/var/airflow/venv/local/ >>> lib/python2.7/site-packages/ >>>>>>>>> airflow/executors/celery_executor.py", >>>>>>>>> line 62, in execute_command >>>>>>>>> raise AirflowException('Celery command failed') >>>>>>>>> AirflowException: Celery command failed >>>>>>>>> >>>>>>>>> I’m using the CeleryExecutor and using Postgres 9.6 for both >>>> Airflow >>>>>>>> data >>>>>>>>> and the Celery result backend. Python version is 2.7.6. What am >>> I >>>>>> doing >>>>>>>>> wrong? >>>>>>>>> >>>>>>>>> Alek >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>
