thanks. Yeah, looks like the only way to do it is to alter the table directly. Ugly. The use of a blob vs something else appears to be embedded in SqlAlchemy in PickleType. Celery has had a similar problem: <https://github.com/celery/celery/issues/461>.
thanks, -Louis On 7/11/16, 2:02 PM, "Maxime Beauchemin" <[email protected]> wrote: >Hi, > >The blob type in MySql is not very large, from my (also insufficient) >memory, it's 64kb. You probably want to alter the `pickle` field in your >DB >to a MEDIUMBLOB or LONGBLOB. > >Max > >On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <[email protected]> wrote: > >> Hello, >> Since we switched our Airflow system to using MySQL as a model store, >>I >> have been getting errors like: _pickle.UnpicklingError: pickle exhausted >> before end of frame. Trace is below. It occurs after an xcom_pull() >>and >> the trace goes through sqlalchemy. A deeper dive into the DB suggests >>that >> the pickle is being stored in the dag_pickle.pickle column as a blob, >>which >> has a max size of 65,535 bytes. >> airflow> desc dag_pickle; >> +--------------+------------+------+-----+---------+----------------+ >> | Field | Type | Null | Key | Default | Extra | >> +--------------+------------+------+-----+---------+----------------+ >> | id | int(11) | NO | PRI | NULL | auto_increment | >> | pickle | blob | YES | | NULL | | >> | created_dttm | datetime | YES | | NULL | | >> | pickle_hash | bigint(20) | YES | | NULL | | >> +--------------+------------+------+-----+---------+----------------+ >> >> Any ideas? I wonder if there is an easy way to switch to a mediumblob. >> I'm looking at models.py now. >> >> thanks, >> >> -Louis >> >> [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing >> <Task(PythonOperator): pivot> on 2016-07-11 09:23:00 >> [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted >>before >> end of frame >> Traceback (most recent call last): >> File "/home/myuser/src/airflow/models.py", line 1245, in run >> result = task_copy.execute(context=context) >> File "/home/myuser/src/airflow/operators/python_operator.py", line 66, >> in execute >> return_value = self.python_callable(*self.op_args, **self.op_kwargs) >> File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in >> updatePivotTables >> tups = ti.xcom_pull(key='run successes', task_ids=runTaskID) >> File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull >> return pull_fn(task_id=task_ids) >> File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper >> result = func(*args, **kwargs) >> File "/home/myuser/src/airflow/models.py", line 3240, in get_one >> result = query.first() >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/orm/query.py", >> line 2659, in first >> ret = list(self[0:1]) >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/orm/query.py", >> line 2457, in __getitem__ >> return list(res) >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/orm/loading.py", >> line 86, in instances >> util.raise_from_cause(err) >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/util/compat.py", >> line 202, in raise_from_cause >> reraise(type(exception), exception, tb=exc_tb, cause=cause) >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/util/compat.py", >> line 186, in reraise >> raise value >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/orm/loading.py", >> line 74, in instances >> for row in fetch] >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/orm/loading.py", >> line 74, in <listcomp> >> for row in fetch] >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/orm/loading.py", >> line 73, in <listcomp> >> rows = [keyed_tuple([proc(row) for proc in process]) >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem >>y/sql/sqltypes.py", >> line 1253, in process >> return loads(value) >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill >>.py", >> line 260, in loads >> return load(file) >> File >> >>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill >>.py", >> line 250, in load >> obj = pik.load() >> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", >>line >> 1039, in load >> dispatch[key[0]](self) >> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", >>line >> 1197, in load_binunicode >> self.append(str(self.read(len), 'utf-8', 'surrogatepass')) >> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", >>line >> 234, in read >> "pickle exhausted before end of frame") >> _pickle.UnpicklingError: pickle exhausted before end of frame >> [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as >>FAILED. >> [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to >> [redacted] >> [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted >>before >> end of frame >> >> >> >> ________________________________ >> >> CONFIDENTIAL TRANSMISSION - This message, including any attachments, is >> confidential and may be privileged. If you are not the intended >>recipient, >> please delete it without further distribution and reply to the sender >>that >> you have received the message in error. >>
