Nathaniel Ritholtz created AIRFLOW-6795:
-------------------------------------------

             Summary: serialized_dag table's data type max length is too small
                 Key: AIRFLOW-6795
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6795
             Project: Apache Airflow
          Issue Type: Bug
          Components: serialization
    Affects Versions: 1.10.9
            Reporter: Nathaniel Ritholtz


When upgrading to v1.10.9, I tried using the new store_serialized_dags flag. 
However, the scheduler was erroring out with:
{code}
scheduler_1  | Process DagFileProcessor2163-Process:
scheduler_1  | Traceback (most recent call last):
scheduler_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", 
line 258, in _bootstrap
scheduler_1  |     self.run()
scheduler_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", 
line 93, in run
scheduler_1  |     self._target(*self._args, **self._kwargs)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 
157, in _run_file_processor
scheduler_1  |     pickle_dags)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in 
wrapper
scheduler_1  |     return func(*args, **kwargs)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 
1580, in process_file
scheduler_1  |     dag.sync_to_db()
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in 
wrapper
scheduler_1  |     return func(*args, **kwargs)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1514, in 
sync_to_db
scheduler_1  |     session=session
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in 
wrapper
scheduler_1  |     return func(*args, **kwargs)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/models/serialized_dag.py", line 
118, in write_dag
scheduler_1  |     session.merge(cls(dag))
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2113, 
in merge
scheduler_1  |     _resolve_conflict_map=_resolve_conflict_map,
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2186, 
in _merge
scheduler_1  |     merged = self.query(mapper.class_).get(key[1])
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1004, in 
get
scheduler_1  |     return self._get_impl(ident, loading.load_on_pk_identity)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1116, in 
_get_impl
scheduler_1  |     return db_load_fn(self, primary_key_identity)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 284, 
in load_on_pk_identity
scheduler_1  |     return q.one()
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3347, in 
one
scheduler_1  |     ret = self.one_or_none()
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3316, in 
one_or_none
scheduler_1  |     ret = list(self)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 101, 
in instances
scheduler_1  |     util.raise_from_cause(err)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, 
in raise_from_cause
scheduler_1  |     reraise(type(exception), exception, tb=exc_tb, cause=cause)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, 
in reraise
scheduler_1  |     raise value
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in 
instances
scheduler_1  |     rows = [proc(row) for row in fetch]
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in 
<listcomp>
scheduler_1  |     rows = [proc(row) for row in fetch]
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 574, 
in _instance
scheduler_1  |     populators,
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 695, 
in _populate_full
scheduler_1  |     dict_[key] = getter(row)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/type_api.py", line 1266, 
in process
scheduler_1  |     return process_value(impl_processor(value), dialect)
scheduler_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line 2407, 
in process
scheduler_1  |     return json_deserializer(value)
scheduler_1  |   File "/usr/local/lib/python3.6/json/__init__.py", line 354, in 
loads
scheduler_1  |     return _default_decoder.decode(s)
scheduler_1  |   File "/usr/local/lib/python3.6/json/decoder.py", line 339, in 
decode
scheduler_1  |     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
scheduler_1  |   File "/usr/local/lib/python3.6/json/decoder.py", line 355, in 
raw_decode
scheduler_1  |     obj, end = self.scan_once(s, idx)
{code}

After further investigation, I found it was because of my usage of the 
airflow-db-dag 
(https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/master/db-cleanup/airflow-db-cleanup.py).
 In this DAG, the params to the PythonOperators includes a hash that has values 
of objects such as DagRun. As a result the resulting serialization of the DAG 
is pretty large. When I looked at the text column for the record in the 
serialized_dag table for this DAG, I saw that the data was cutoff mid DAG and 
the character length was at 65535. This is because the data column is type TEXT 
which has a max character length at 65535. So what I assume was happenning is 
the process which was storing the DAG serialized data was forced to truncate 
the DAG somewhere in the middle of the serialization.

Is it possible to maybe change the TEXT field to MEDIUMTEXT? Locally I made the 
change on my mysql DB and the DAG was able to be serialized/deserialized 
successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to