Hi, The blob type in MySql is not very large, from my (also insufficient) memory, it's 64kb. You probably want to alter the `pickle` field in your DB to a MEDIUMBLOB or LONGBLOB.
Max On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <[email protected]> wrote: > Hello, > Since we switched our Airflow system to using MySQL as a model store, I > have been getting errors like: _pickle.UnpicklingError: pickle exhausted > before end of frame. Trace is below. It occurs after an xcom_pull() and > the trace goes through sqlalchemy. A deeper dive into the DB suggests that > the pickle is being stored in the dag_pickle.pickle column as a blob, which > has a max size of 65,535 bytes. > airflow> desc dag_pickle; > +--------------+------------+------+-----+---------+----------------+ > | Field | Type | Null | Key | Default | Extra | > +--------------+------------+------+-----+---------+----------------+ > | id | int(11) | NO | PRI | NULL | auto_increment | > | pickle | blob | YES | | NULL | | > | created_dttm | datetime | YES | | NULL | | > | pickle_hash | bigint(20) | YES | | NULL | | > +--------------+------------+------+-----+---------+----------------+ > > Any ideas? I wonder if there is an easy way to switch to a mediumblob. > I'm looking at models.py now. > > thanks, > > -Louis > > [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing > <Task(PythonOperator): pivot> on 2016-07-11 09:23:00 > [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted before > end of frame > Traceback (most recent call last): > File "/home/myuser/src/airflow/models.py", line 1245, in run > result = task_copy.execute(context=context) > File "/home/myuser/src/airflow/operators/python_operator.py", line 66, > in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in > updatePivotTables > tups = ti.xcom_pull(key='run successes', task_ids=runTaskID) > File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull > return pull_fn(task_id=task_ids) > File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper > result = func(*args, **kwargs) > File "/home/myuser/src/airflow/models.py", line 3240, in get_one > result = query.first() > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py", > line 2659, in first > ret = list(self[0:1]) > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py", > line 2457, in __getitem__ > return list(res) > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", > line 86, in instances > util.raise_from_cause(err) > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py", > line 202, in raise_from_cause > reraise(type(exception), exception, tb=exc_tb, cause=cause) > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py", > line 186, in reraise > raise value > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", > line 74, in instances > for row in fetch] > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", > line 74, in <listcomp> > for row in fetch] > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", > line 73, in <listcomp> > rows = [keyed_tuple([proc(row) for proc in process]) > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py", > line 1253, in process > return loads(value) > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py", > line 260, in loads > return load(file) > File > "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py", > line 250, in load > obj = pik.load() > File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line > 1039, in load > dispatch[key[0]](self) > File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line > 1197, in load_binunicode > self.append(str(self.read(len), 'utf-8', 'surrogatepass')) > File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line > 234, in read > "pickle exhausted before end of frame") > _pickle.UnpicklingError: pickle exhausted before end of frame > [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as FAILED. > [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to > [redacted] > [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted before > end of frame > > > > ________________________________ > > CONFIDENTIAL TRANSMISSION - This message, including any attachments, is > confidential and may be privileged. If you are not the intended recipient, > please delete it without further distribution and reply to the sender that > you have received the message in error. >
