vitaly-krugl opened a new issue, #25075:
URL: https://github.com/apache/airflow/issues/25075
### Apache Airflow version
2.3.2
### What happened
My production deployment had Airflow v1.8.0. It already had Dags, Task
Instances, etc., in the tables. When upgrading to Airflow v2.3.2, the execution
of `airflow db upgrade` resulted in the failure:
`sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column
dag.root_dag_id does not exist`.
The problem is due to an unsafe practice used in the implementation of
multiple migration version scripts, including some recent ones. More on this in
the "Anything else" section below.
The logs/traceback:
```
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 ->
cc1e65623dc7, Add ``max_tries`` column to ``task_instance``
Traceback (most recent call last):
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line
1705, in _execute_context
self.dialect.do_execute(
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line
716, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column dag.root_dag_id does not exist
LINE 1: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root...
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/a/bin/airflow", line 8, in <module>
sys.exit(main())
File "/a/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/a/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51,
in command
return func(*args, **kwargs)
File "/a/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in
wrapper
return f(*args, **kwargs)
File "/a/lib/python3.9/site-packages/airflow/cli/commands/db_command.py",
line 82, in upgradedb
db.upgradedb(to_revision=to_revision, from_revision=from_revision,
show_sql_only=args.show_sql_only)
File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 71,
in wrapper
return func(*args, session=session, **kwargs)
File "/a/lib/python3.9/site-packages/airflow/utils/db.py", line 1449, in
upgradedb
command.upgrade(config, revision=to_revision or 'heads')
File "/a/lib/python3.9/site-packages/alembic/command.py", line 322, in
upgrade
script.run_env()
File "/a/lib/python3.9/site-packages/alembic/script/base.py", line 569, in
run_env
util.load_python_file(self.dir, "env.py")
File "/a/lib/python3.9/site-packages/alembic/util/pyfiles.py", line 94, in
load_python_file
module = load_module_py(module_id, path)
File "/a/lib/python3.9/site-packages/alembic/util/pyfiles.py", line 110,
in load_module_py
spec.loader.exec_module(module) # type: ignore
File "<frozen importlib._bootstrap_external>", line 855, in exec_module
File "<frozen importlib._bootstrap>", line 228, in
_call_with_frames_removed
File "/a/lib/python3.9/site-packages/airflow/migrations/env.py", line 107,
in <module>
run_migrations_online()
File "/a/lib/python3.9/site-packages/airflow/migrations/env.py", line 101,
in run_migrations_online
context.run_migrations()
File "<string>", line 8, in run_migrations
File "/a/lib/python3.9/site-packages/alembic/runtime/environment.py", line
853, in run_migrations
self.get_context().run_migrations(**kw)
File "/a/lib/python3.9/site-packages/alembic/runtime/migration.py", line
623, in run_migrations
step.migration_fn(**kw)
File
"/a/lib/python3.9/site-packages/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py",
line 81, in upgrade
dag = dagbag.get_dag(ti.dag_id)
File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 71,
in wrapper
return func(*args, session=session, **kwargs)
File "/a/lib/python3.9/site-packages/airflow/models/dagbag.py", line 217,
in get_dag
orm_dag = DagModel.get_current(root_dag_id, session=session)
File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 68,
in wrapper
return func(*args, **kwargs)
File "/a/lib/python3.9/site-packages/airflow/models/dag.py", line 2775, in
get_current
return session.query(cls).filter(cls.dag_id == dag_id).first()
File "/a/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2734,
in first
return self.limit(1)._iter().first()
File "/a/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2818,
in _iter
result = self.session.execute(
File "/a/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line
1670, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line
1520, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/a/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line
313, in _execute_on_connection
return connection._execute_clauseelement(
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line
1389, in _execute_clauseelement
ret = self._execute_context(
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line
1748, in _execute_context
self._handle_dbapi_exception(
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line
1929, in _handle_dbapi_exception
util.raise_(
File "/a/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211,
in raise_
raise exception
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line
1705, in _execute_context
self.dialect.do_execute(
File "/a/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line
716, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column
dag.root_dag_id does not exist
LINE 1: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root...
^
[SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id,
dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active
AS dag_is_active, dag.last_parsed_time AS dag_last_parsed_time,
dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired,
dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id,
dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS
dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS
dag_schedule_interval, dag.timetable_description AS dag_timetable_description,
dag.max_active_tasks AS dag_max_active_tasks, dag.max_active_runs AS
dag_max_active_runs, dag.has_task_concurrency_limits AS
dag_has_task_concurrency_limits, dag.has_import_errors AS
dag_has_import_errors, dag.next_dagrun AS dag_next_dagrun,
dag.next_dagrun_data_interval_start AS dag_next_dagrun_data_interval_start,
dag.next_dagrun_data_interval_end AS dag_next_dagrun_data_interval_end, dag.ne
xt_dagrun_create_after AS dag_next_dagrun_create_after
FROM dag
WHERE dag.dag_id = %(dag_id_1)s
LIMIT %(param_1)s]
[parameters: {'dag_id_1': 'dns_dv_challenge_initiate_v2', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/14/f405)
```
### What you think should happen instead
The database upgrade should have succeeded.
### How to reproduce
Install Airflow 1.8.0 and initialize airflow db.
Create and execute some dags so that Dag table gets populated.
Upgrade airflow to 2.3.2 and upgrade the database via `airflow db upgrade`.
### Operating System
OS-X
### Versions of Apache Airflow Providers
apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-sqlite==2.1.3
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else
The sqlalchemy ProgrammingError exception occurred during the alembic
database upgrade 127d2bf2dfa7 -> cc1e65623dc7. Searching for the target
revision cc1e65623dc7 in the sources of Airflow v2.3.2, matches this Airflow
database migration version script:
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py.
Here, we observe the following imports from Airflow's own code at
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py#L31-L33:
from airflow import settings
from airflow.compat.sqlalchemy import inspect
from airflow.models import DagBag
And the code at lines
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py#L74-L81
triggers the aforementioned ProgrammingError exception:
dagbag = DagBag(settings.DAGS_FOLDER)
query =
session.query(sa.func.count(TaskInstance.max_tries)).filter(TaskInstance.max_tries
== -1)
# Separate db query in batch to prevent loading entire table
# into memory and cause out of memory error.
while query.scalar():
tis = session.query(TaskInstance).filter(TaskInstance.max_tries ==
-1).limit(BATCH_SIZE).all()
for ti in tis:
dag = dagbag.get_dag(ti.dag_id)
What's happening? Let's recall that the error during migration to database
revision cc1e65623dc7 was "sqlalchemy.exc.ProgrammingError:
(psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist". So,
the failure occurred because the call to `dagbag.get_dag(ti.dag_id)` was trying
to load a Dag model with the column "root_dag_id" and this column didn't exist
yet in the Airflow database revision 127d2bf2dfa7 (from Airflow v1.8.0). The
column "root_dag_id" would be added by a later version script
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0045_1_10_7_add_root_dag_id_to_dag.py
from Airflow v1.10.7.
The problem with 0023_1_8_2_add_max_tries_column_to_task_instance.py is that
it imports and uses DagBag to fetch Dag models which had all the expected
columns during the time that this migration version script was developed, but
this was no longer the case when the I actually tried to run the airflow db
upgrade - by this time, the Dag columns had changed, and so the Dag model was
trying to load the column "root_dag_id" (since the upgrade was happening at
Airflow v2.3.2 which has this column), but the actual database (which was
initialized by Airflow v1.8.0) didn't have this column yet!
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]