Hello,
  Since we switched our Airflow system to using MySQL as a model store, I have 
been getting errors like: _pickle.UnpicklingError: pickle exhausted before end 
of frame.  Trace is below.  It occurs after an xcom_pull() and the trace goes 
through sqlalchemy.  A deeper dive into the DB suggests that the pickle is 
being stored in the dag_pickle.pickle column as a blob, which has a max size of 
65,535 bytes.
airflow> desc dag_pickle;
+--------------+------------+------+-----+---------+----------------+
| Field        | Type       | Null | Key | Default | Extra          |
+--------------+------------+------+-----+---------+----------------+
| id           | int(11)    | NO   | PRI | NULL    | auto_increment |
| pickle       | blob       | YES  |     | NULL    |                |
| created_dttm | datetime   | YES  |     | NULL    |                |
| pickle_hash  | bigint(20) | YES  |     | NULL    |                |
+--------------+------------+------+-----+---------+----------------+

Any ideas?  I wonder if there is an easy way to switch to a mediumblob.  I'm 
looking at models.py now.

thanks,

-Louis

[2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing 
<Task(PythonOperator): pivot> on 2016-07-11 09:23:00
[2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted before end 
of frame
Traceback (most recent call last):
  File "/home/myuser/src/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
  File "/home/myuser/src/airflow/operators/python_operator.py", line 66, in 
execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in 
updatePivotTables
    tups = ti.xcom_pull(key='run successes', task_ids=runTaskID)
  File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull
    return pull_fn(task_id=task_ids)
  File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/home/myuser/src/airflow/models.py", line 3240, in get_one
    result = query.first()
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py",
 line 2659, in first
    ret = list(self[0:1])
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py",
 line 2457, in __getitem__
    return list(res)
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
 line 86, in instances
    util.raise_from_cause(err)
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py",
 line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py",
 line 186, in reraise
    raise value
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
 line 74, in instances
    for row in fetch]
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
 line 74, in <listcomp>
    for row in fetch]
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
 line 73, in <listcomp>
    rows = [keyed_tuple([proc(row) for proc in process])
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py",
 line 1253, in process
    return loads(value)
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py", 
line 260, in loads
    return load(file)
  File 
"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py", 
line 250, in load
    obj = pik.load()
  File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line 1039, 
in load
    dispatch[key[0]](self)
  File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line 1197, 
in load_binunicode
    self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
  File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line 234, 
in read
    "pickle exhausted before end of frame")
_pickle.UnpicklingError: pickle exhausted before end of frame
[2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as FAILED.
[2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to [redacted]
[2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted before end 
of frame



________________________________

CONFIDENTIAL TRANSMISSION - This message, including any attachments, is 
confidential and may be privileged. If you are not the intended recipient, 
please delete it without further distribution and reply to the sender that you 
have received the message in error.

Reply via email to