But cloudpickle looks promising. What’s the speed, mem requirements?

Bolke

Verstuurd vanaf mijn iPad

> Op 23 okt. 2017 om 19:57 heeft Bolke de Bruin <[email protected]> het 
> volgende geschreven:
> 
> The other option is to use zipped dags and have those picked up by the 
> workers from the api. This is less error prone than pickling (marshmallow, 
> cloudpickle). I have a working prototype for this, but needs to be updated to 
> the current airflow. 
> 
> Another option is to use copyreg for fields that are difficult the serialize 
> and apply that to the jinja2 template fields. This allows one to pickle. 
> 
> But I think we should deprecate pickling all together and move over to 
> something external/api wise.
> 
> Cheers
> Bolke
> 
> Verstuurd vanaf mijn iPad
> 
>> Op 23 okt. 2017 om 19:35 heeft Ruslan Dautkhanov <[email protected]> het 
>> volgende geschreven:
>> 
>> I looked over cloudpickle that Alek mentioned. Looks cool - thanks for
>> referencing that.
>> https://github.com/cloudpipe/cloudpickle
>> It could be a drop-in replacement for pickle and a low-hanging fruit to fix
>> this issue.
>> I don't see this to be an issue storing cloudpickled objects in a database
>> table's blob field.
>> Thanks.
>> 
>> 
>> -- 
>> Ruslan Dautkhanov
>> 
>> On Thu, Oct 12, 2017 at 2:56 PM, Maxime Beauchemin <
>> [email protected]> wrote:
>> 
>>> One issue that's been standing in the way is the fact that Jinja template
>>> objects are not pickleable. That and the fact that when people pass objects
>>> into their DAG objects (through params, callbacks or whatever other ways),
>>> the serialization can get tangled and pickles become gigantic. People
>>> typically don't understand the implications in that context.
>>> 
>>> For now there's a workaround the Jinja template pickling issue that limits
>>> what you can do with Jinja (you'll notice that extends and imports just
>>> won't work in a pickle/remote setting).
>>> 
>>> I remember spending a few hours on trying to pickle jinja templates in the
>>> first weeks of the project and ultimately giving up. I'm sure someone could
>>> get that working.
>>> 
>>> Here's another related question, is the database a proper transport layer
>>> for pickles? It feels like a hack to me...
>>> 
>>> Another idea that was discussed was to create a new BaseDagFetcher
>>> abstraction, along with a replacement for the current implementation:
>>> FilesystemDagFecher. Then people could write/use whatever other
>>> implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher,
>>> ....
>>> 
>>> Max
>>> 
>>>> On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <[email protected]> wrote:
>>>> 
>>>> That's disappointing. The promise of being able to deploy code just to
>>> the
>>>> Airflow master, and have that automatically propagated to workers, was a
>>>> major selling point for us when we chose Airflow over its alternatives -
>>> it
>>>> would greatly simplify our deploy tooling, and free us from having to
>>> worry
>>>> about DAG definitions getting out of sync between the master and workers.
>>>> 
>>>> Perhaps the cloudpickle library, which came out of PySpark, could help
>>>> here: https://github.com/cloudpipe/cloudpickle. It appears to be
>>>> specifically designed for shipping Python code over a network.
>>>> 
>>>> Alek
>>>> 
>>>> On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin <
>>>> [email protected]> wrote:
>>>> 
>>>>> FYI: there's been talks on deprecating pickling altogether as it's very
>>>>> brittle.
>>>>> 
>>>>> Max
>>>>> 
>>>>> On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <[email protected]>
>>>> wrote:
>>>>> 
>>>>>> Can anyone help with this? Has anyone successfully used Airflow with
>>>>>> pickling turned on that can give details on their setup?
>>>>>> 
>>>>>> Thanks,
>>>>>> Alek
>>>>>> 
>>>>>> On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <[email protected]>
>>>> wrote:
>>>>>> 
>>>>>>> Yes, everything's correctly imported - everything works fine when I
>>>> run
>>>>>>> the scheduler without pickling turned on.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Alek
>>>>>>> 
>>>>>>> On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez <
>>>>>>> [email protected]> wrote:
>>>>>>> 
>>>>>>>> The relevant part seems to be:
>>>>>>>> 
>>>>>>>> ImportError: No module named
>>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
>>>> a16effe386_fetch_orgmast
>>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow
>>>> run
>>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
>>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
>>> returned
>>>>>>>> non-zero exit status 1
>>>>>>>> 
>>>>>>>> Did you check that your task and script `fetch_orgmast.py` are
>>>>> correctly
>>>>>>>> importing all modules that they use?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Edgar
>>>>>>>> 
>>>>>>>> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <[email protected]
>>>> 
>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi all,
>>>>>>>>> 
>>>>>>>>> When running the scheduler as airflow scheduler -p and the
>>> worker
>>>>> (on
>>>>>> a
>>>>>>>>> different box) as airflow worker -q foo, I get the following
>>>>> exception
>>>>>>>> in
>>>>>>>>> the worker process when triggering a DAG run manually:
>>>>>>>>> 
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>> File "/site/var/airflow/venv/bin/airflow", line 28, in
>>> <module>
>>>>>>>>>   args.func(args)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> airflow/bin/cli.py",
>>>>>>>>> line 393, in run
>>>>>>>>>   DagPickle).filter(DagPickle.id == args.pickle).first()
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/orm/query.py",
>>>>>>>>> line 2755, in first
>>>>>>>>>   ret = list(self[0:1])
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/orm/query.py",
>>>>>>>>> line 2547, in __getitem__
>>>>>>>>>   return list(res)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>>> line 90, in instances
>>>>>>>>>   util.raise_from_cause(err)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/util/compat.py",
>>>>>>>>> line 203, in raise_from_cause
>>>>>>>>>   reraise(type(exception), exception, tb=exc_tb, cause=cause)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>>> line 75, in instances
>>>>>>>>>   rows = [proc(row) for row in fetch]
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>>> line 437, in _instance
>>>>>>>>>   loaded_instance, populate_existing, populators)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>>> line 498, in _populate_full
>>>>>>>>>   dict_[key] = getter(row)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> sqlalchemy/sql/sqltypes.py",
>>>>>>>>> line 1540, in process
>>>>>>>>>   return loads(value)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> dill/dill.py",
>>>>>>>>> line 299, in loads
>>>>>>>>>   return load(file)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> dill/dill.py",
>>>>>>>>> line 288, in load
>>>>>>>>>   obj = pik.load()
>>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 858, in load
>>>>>>>>>   dispatch[key](self)
>>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1090, in
>>> load_global
>>>>>>>>>   klass = self.find_class(module, name)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> dill/dill.py",
>>>>>>>>> line 445, in find_class
>>>>>>>>>   return StockUnpickler.find_class(self, module, name)
>>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>>>>>>>>   __import__(module)
>>>>>>>>> ImportError: No module named
>>>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
>>>>> a16effe386_fetch_orgmast
>>>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command
>>> 'airflow
>>>>> run
>>>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle
>>> 599
>>>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
>>>> returned
>>>>>>>>> non-zero exit status 1
>>>>>>>>> [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
>>>>>>>>> airflow.executors.celery_executor.execute_command[c73d77f5-
>>>>>>>> 963c-44c1-b633-
>>>>>>>>> dc00a752f58f]
>>>>>>>>> raised unexpected: AirflowException('Celery command failed',)
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> celery/app/trace.py",
>>>>>>>>> line 374, in trace_task
>>>>>>>>>   R = retval = fun(*args, **kwargs)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> celery/app/trace.py",
>>>>>>>>> line 629, in __protected_call__
>>>>>>>>>   return self.run(*args, **kwargs)
>>>>>>>>> File "/site/var/airflow/venv/local/
>>> lib/python2.7/site-packages/
>>>>>>>>> airflow/executors/celery_executor.py",
>>>>>>>>> line 62, in execute_command
>>>>>>>>>   raise AirflowException('Celery command failed')
>>>>>>>>> AirflowException: Celery command failed
>>>>>>>>> 
>>>>>>>>> I’m using the CeleryExecutor and using Postgres 9.6 for both
>>>> Airflow
>>>>>>>> data
>>>>>>>>> and the Celery result backend. Python version is 2.7.6. What am
>>> I
>>>>>> doing
>>>>>>>>> wrong?
>>>>>>>>> 
>>>>>>>>> Alek
>>>>>>>>> ​
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 

Reply via email to