verified hack solution for this problem:
alter table xcom modify value MEDIUMBLOB;
‹> that alleviates the Œpickle exhausted¹ error for xcom.pull()

a longer term solution might involve using PickleType(length=...) to force
SqlAlchemy/MySQL to use something bigger around line 3162 in models.py.
See: 
http://docs.sqlalchemy.org/en/latest/core/type_basics.html#sqlalchemy.types
.LargeBinary

On 7/11/16, 2:06 PM, "Clark, Louis" <[email protected]> wrote:

>thanks.  Yeah, looks like the only way to do it is to alter the table
>directly.  Ugly.  The use of a blob vs something else appears to be
>embedded in SqlAlchemy in PickleType.  Celery has had a similar problem:
><https://github.com/celery/celery/issues/461>.
>
>thanks,
>
>-Louis
>
>
>
>
>On 7/11/16, 2:02 PM, "Maxime Beauchemin" <[email protected]>
>wrote:
>
>>Hi,
>>
>>The blob type in MySql is not very large, from my (also insufficient)
>>memory, it's 64kb. You probably want to alter the `pickle` field in your
>>DB
>>to a MEDIUMBLOB or LONGBLOB.
>>
>>Max
>>
>>On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <[email protected]>
>>wrote:
>>
>>> Hello,
>>>   Since we switched our Airflow system to using MySQL as a model store,
>>>I
>>> have been getting errors like: _pickle.UnpicklingError: pickle
>>>exhausted
>>> before end of frame.  Trace is below.  It occurs after an xcom_pull()
>>>and
>>> the trace goes through sqlalchemy.  A deeper dive into the DB suggests
>>>that
>>> the pickle is being stored in the dag_pickle.pickle column as a blob,
>>>which
>>> has a max size of 65,535 bytes.
>>> airflow> desc dag_pickle;
>>> +--------------+------------+------+-----+---------+----------------+
>>> | Field        | Type       | Null | Key | Default | Extra          |
>>> +--------------+------------+------+-----+---------+----------------+
>>> | id           | int(11)    | NO   | PRI | NULL    | auto_increment |
>>> | pickle       | blob       | YES  |     | NULL    |                |
>>> | created_dttm | datetime   | YES  |     | NULL    |                |
>>> | pickle_hash  | bigint(20) | YES  |     | NULL    |                |
>>> +--------------+------------+------+-----+---------+----------------+
>>>
>>> Any ideas?  I wonder if there is an easy way to switch to a mediumblob.
>>> I'm looking at models.py now.
>>>
>>> thanks,
>>>
>>> -Louis
>>>
>>> [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing
>>> <Task(PythonOperator): pivot> on 2016-07-11 09:23:00
>>> [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted
>>>before
>>> end of frame
>>> Traceback (most recent call last):
>>>   File "/home/myuser/src/airflow/models.py", line 1245, in run
>>>     result = task_copy.execute(context=context)
>>>   File "/home/myuser/src/airflow/operators/python_operator.py", line
>>>66,
>>> in execute
>>>     return_value = self.python_callable(*self.op_args,
>>>**self.op_kwargs)
>>>   File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in
>>> updatePivotTables
>>>     tups = ti.xcom_pull(key='run successes', task_ids=runTaskID)
>>>   File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull
>>>     return pull_fn(task_id=task_ids)
>>>   File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper
>>>     result = func(*args, **kwargs)
>>>   File "/home/myuser/src/airflow/models.py", line 3240, in get_one
>>>     result = query.first()
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/query.py",
>>> line 2659, in first
>>>     ret = list(self[0:1])
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/query.py",
>>> line 2457, in __getitem__
>>>     return list(res)
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 86, in instances
>>>     util.raise_from_cause(err)
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/util/compat.py",
>>> line 202, in raise_from_cause
>>>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/util/compat.py",
>>> line 186, in reraise
>>>     raise value
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 74, in instances
>>>     for row in fetch]
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 74, in <listcomp>
>>>     for row in fetch]
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 73, in <listcomp>
>>>     rows = [keyed_tuple([proc(row) for proc in process])
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/sql/sqltypes.py",
>>> line 1253, in process
>>>     return loads(value)
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dil
>>>l
>>>.py",
>>> line 260, in loads
>>>     return load(file)
>>>   File
>>> 
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dil
>>>l
>>>.py",
>>> line 250, in load
>>>     obj = pik.load()
>>>   File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>>line
>>> 1039, in load
>>>     dispatch[key[0]](self)
>>>   File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>>line
>>> 1197, in load_binunicode
>>>     self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
>>>   File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>>line
>>> 234, in read
>>>     "pickle exhausted before end of frame")
>>> _pickle.UnpicklingError: pickle exhausted before end of frame
>>> [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as
>>>FAILED.
>>> [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to
>>> [redacted]
>>> [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted
>>>before
>>> end of frame
>>>
>>>
>>>
>>> ________________________________
>>>
>>> CONFIDENTIAL TRANSMISSION - This message, including any attachments, is
>>> confidential and may be privileged. If you are not the intended
>>>recipient,
>>> please delete it without further distribution and reply to the sender
>>>that
>>> you have received the message in error.
>>>
>

Reply via email to