[
https://issues.apache.org/jira/browse/AIRFLOW-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037109#comment-17037109
]
ASF GitHub Bot commented on AIRFLOW-6795:
-----------------------------------------
nritholtz commented on pull request #7414: [AIRFLOW-6795] Increase text size
on data column in serialized_dag for MySQL
URL: https://github.com/apache/airflow/pull/7414
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> serialized_dag table's data column text type is too small for mysql
> -------------------------------------------------------------------
>
> Key: AIRFLOW-6795
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6795
> Project: Apache Airflow
> Issue Type: Bug
> Components: serialization
> Affects Versions: 1.10.9
> Reporter: Nathaniel Ritholtz
> Priority: Major
>
> When upgrading to v1.10.9, I tried using the new store_serialized_dags flag.
> However, the scheduler was erroring out with:
> {code}
> scheduler_1 | Process DagFileProcessor2163-Process:
> scheduler_1 | Traceback (most recent call last):
> scheduler_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py",
> line 258, in _bootstrap
> scheduler_1 | self.run()
> scheduler_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py",
> line 93, in run
> scheduler_1 | self._target(*self._args, **self._kwargs)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line
> 157, in _run_file_processor
> scheduler_1 | pickle_dags)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in
> wrapper
> scheduler_1 | return func(*args, **kwargs)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line
> 1580, in process_file
> scheduler_1 | dag.sync_to_db()
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in
> wrapper
> scheduler_1 | return func(*args, **kwargs)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1514, in
> sync_to_db
> scheduler_1 | session=session
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in
> wrapper
> scheduler_1 | return func(*args, **kwargs)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/airflow/models/serialized_dag.py",
> line 118, in write_dag
> scheduler_1 | session.merge(cls(dag))
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line
> 2113, in merge
> scheduler_1 | _resolve_conflict_map=_resolve_conflict_map,
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line
> 2186, in _merge
> scheduler_1 | merged = self.query(mapper.class_).get(key[1])
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1004,
> in get
> scheduler_1 | return self._get_impl(ident, loading.load_on_pk_identity)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1116,
> in _get_impl
> scheduler_1 | return db_load_fn(self, primary_key_identity)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 284,
> in load_on_pk_identity
> scheduler_1 | return q.one()
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3347,
> in one
> scheduler_1 | ret = self.one_or_none()
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3316,
> in one_or_none
> scheduler_1 | ret = list(self)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 101,
> in instances
> scheduler_1 | util.raise_from_cause(err)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398,
> in raise_from_cause
> scheduler_1 | reraise(type(exception), exception, tb=exc_tb, cause=cause)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153,
> in reraise
> scheduler_1 | raise value
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81,
> in instances
> scheduler_1 | rows = [proc(row) for row in fetch]
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81,
> in <listcomp>
> scheduler_1 | rows = [proc(row) for row in fetch]
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 574,
> in _instance
> scheduler_1 | populators,
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 695,
> in _populate_full
> scheduler_1 | dict_[key] = getter(row)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/type_api.py", line
> 1266, in process
> scheduler_1 | return process_value(impl_processor(value), dialect)
> scheduler_1 | File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line
> 2407, in process
> scheduler_1 | return json_deserializer(value)
> scheduler_1 | File "/usr/local/lib/python3.6/json/__init__.py", line 354,
> in loads
> scheduler_1 | return _default_decoder.decode(s)
> scheduler_1 | File "/usr/local/lib/python3.6/json/decoder.py", line 339,
> in decode
> scheduler_1 | obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> scheduler_1 | File "/usr/local/lib/python3.6/json/decoder.py", line 355,
> in raw_decode
> scheduler_1 | obj, end = self.scan_once(s, idx)
> {code}
> After further investigation, I found it was because of my usage of the
> airflow-db-dag
> (https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/master/db-cleanup/airflow-db-cleanup.py).
> In this DAG, the params to the PythonOperators includes a hash that has
> values of objects such as DagRun. As a result the resulting serialization of
> the DAG is pretty large. When I looked at the text column for the record in
> the serialized_dag table for this DAG, I saw that the data was cutoff mid DAG
> and the character length was at 65535. This is because the data column is
> type TEXT which has a max character length at 65535. So what I assume was
> happenning is the process which was storing the DAG serialized data was
> forced to truncate the DAG somewhere in the middle of the serialization.
> Is it possible to maybe change the TEXT field to MEDIUMTEXT? Locally I made
> the change on my mysql DB and the DAG was able to be serialized/deserialized
> successfully.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)