verified hack solution for this problem: alter table xcom modify value MEDIUMBLOB; ‹> that alleviates the Œpickle exhausted¹ error for xcom.pull()
a longer term solution might involve using PickleType(length=...) to force SqlAlchemy/MySQL to use something bigger around line 3162 in models.py. See: http://docs.sqlalchemy.org/en/latest/core/type_basics.html#sqlalchemy.types .LargeBinary On 7/11/16, 2:06 PM, "Clark, Louis" <[email protected]> wrote: >thanks. Yeah, looks like the only way to do it is to alter the table >directly. Ugly. The use of a blob vs something else appears to be >embedded in SqlAlchemy in PickleType. Celery has had a similar problem: ><https://github.com/celery/celery/issues/461>. > >thanks, > >-Louis > > > > >On 7/11/16, 2:02 PM, "Maxime Beauchemin" <[email protected]> >wrote: > >>Hi, >> >>The blob type in MySql is not very large, from my (also insufficient) >>memory, it's 64kb. You probably want to alter the `pickle` field in your >>DB >>to a MEDIUMBLOB or LONGBLOB. >> >>Max >> >>On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <[email protected]> >>wrote: >> >>> Hello, >>> Since we switched our Airflow system to using MySQL as a model store, >>>I >>> have been getting errors like: _pickle.UnpicklingError: pickle >>>exhausted >>> before end of frame. Trace is below. It occurs after an xcom_pull() >>>and >>> the trace goes through sqlalchemy. A deeper dive into the DB suggests >>>that >>> the pickle is being stored in the dag_pickle.pickle column as a blob, >>>which >>> has a max size of 65,535 bytes. >>> airflow> desc dag_pickle; >>> +--------------+------------+------+-----+---------+----------------+ >>> | Field | Type | Null | Key | Default | Extra | >>> +--------------+------------+------+-----+---------+----------------+ >>> | id | int(11) | NO | PRI | NULL | auto_increment | >>> | pickle | blob | YES | | NULL | | >>> | created_dttm | datetime | YES | | NULL | | >>> | pickle_hash | bigint(20) | YES | | NULL | | >>> +--------------+------------+------+-----+---------+----------------+ >>> >>> Any ideas? I wonder if there is an easy way to switch to a mediumblob. >>> I'm looking at models.py now. >>> >>> thanks, >>> >>> -Louis >>> >>> [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing >>> <Task(PythonOperator): pivot> on 2016-07-11 09:23:00 >>> [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted >>>before >>> end of frame >>> Traceback (most recent call last): >>> File "/home/myuser/src/airflow/models.py", line 1245, in run >>> result = task_copy.execute(context=context) >>> File "/home/myuser/src/airflow/operators/python_operator.py", line >>>66, >>> in execute >>> return_value = self.python_callable(*self.op_args, >>>**self.op_kwargs) >>> File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in >>> updatePivotTables >>> tups = ti.xcom_pull(key='run successes', task_ids=runTaskID) >>> File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull >>> return pull_fn(task_id=task_ids) >>> File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper >>> result = func(*args, **kwargs) >>> File "/home/myuser/src/airflow/models.py", line 3240, in get_one >>> result = query.first() >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/orm/query.py", >>> line 2659, in first >>> ret = list(self[0:1]) >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/orm/query.py", >>> line 2457, in __getitem__ >>> return list(res) >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/orm/loading.py", >>> line 86, in instances >>> util.raise_from_cause(err) >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/util/compat.py", >>> line 202, in raise_from_cause >>> reraise(type(exception), exception, tb=exc_tb, cause=cause) >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/util/compat.py", >>> line 186, in reraise >>> raise value >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/orm/loading.py", >>> line 74, in instances >>> for row in fetch] >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/orm/loading.py", >>> line 74, in <listcomp> >>> for row in fetch] >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/orm/loading.py", >>> line 73, in <listcomp> >>> rows = [keyed_tuple([proc(row) for proc in process]) >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche >>>m >>>y/sql/sqltypes.py", >>> line 1253, in process >>> return loads(value) >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dil >>>l >>>.py", >>> line 260, in loads >>> return load(file) >>> File >>> >>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dil >>>l >>>.py", >>> line 250, in load >>> obj = pik.load() >>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", >>>line >>> 1039, in load >>> dispatch[key[0]](self) >>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", >>>line >>> 1197, in load_binunicode >>> self.append(str(self.read(len), 'utf-8', 'surrogatepass')) >>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", >>>line >>> 234, in read >>> "pickle exhausted before end of frame") >>> _pickle.UnpicklingError: pickle exhausted before end of frame >>> [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as >>>FAILED. >>> [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to >>> [redacted] >>> [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted >>>before >>> end of frame >>> >>> >>> >>> ________________________________ >>> >>> CONFIDENTIAL TRANSMISSION - This message, including any attachments, is >>> confidential and may be privileged. If you are not the intended >>>recipient, >>> please delete it without further distribution and reply to the sender >>>that >>> you have received the message in error. >>> >
