vitaly-krugl opened a new issue, #25075:
URL: https://github.com/apache/airflow/issues/25075

   ### Apache Airflow version
   
   2.3.2
   
   ### What happened
   
   My production deployment had Airflow v1.8.0. It already had Dags, Task 
Instances, etc., in the tables. When upgrading to Airflow v2.3.2, the execution 
of `airflow db upgrade` resulted in the failure: 
`sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column 
dag.root_dag_id does not exist`.
   
   The problem is due to an unsafe practice used in the implementation of 
multiple migration version scripts, including some recent ones. More on this in 
the "Anything else" section below.
   
   
   The logs/traceback:
   ```
   INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
   INFO  [alembic.runtime.migration] Will assume transactional DDL.
   INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> 
cc1e65623dc7, Add ``max_tries`` column to ``task_instance``
   Traceback (most recent call last):
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 
1705, in _execute_context
       self.dialect.do_execute(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 
716, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.UndefinedColumn: column dag.root_dag_id does not exist
   LINE 1: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root...
                                            ^
    
    
   The above exception was the direct cause of the following exception:
    
   Traceback (most recent call last):
     File "/a/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/a/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
       args.func(args)
     File "/a/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, 
in command
       return func(*args, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in 
wrapper
       return f(*args, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/cli/commands/db_command.py", 
line 82, in upgradedb
       db.upgradedb(to_revision=to_revision, from_revision=from_revision, 
show_sql_only=args.show_sql_only)
     File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 71, 
in wrapper
       return func(*args, session=session, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/utils/db.py", line 1449, in 
upgradedb
       command.upgrade(config, revision=to_revision or 'heads')
     File "/a/lib/python3.9/site-packages/alembic/command.py", line 322, in 
upgrade
       script.run_env()
     File "/a/lib/python3.9/site-packages/alembic/script/base.py", line 569, in 
run_env
       util.load_python_file(self.dir, "env.py")
     File "/a/lib/python3.9/site-packages/alembic/util/pyfiles.py", line 94, in 
load_python_file
       module = load_module_py(module_id, path)
     File "/a/lib/python3.9/site-packages/alembic/util/pyfiles.py", line 110, 
in load_module_py
       spec.loader.exec_module(module)  # type: ignore
     File "<frozen importlib._bootstrap_external>", line 855, in exec_module
     File "<frozen importlib._bootstrap>", line 228, in 
_call_with_frames_removed
     File "/a/lib/python3.9/site-packages/airflow/migrations/env.py", line 107, 
in <module>
       run_migrations_online()
     File "/a/lib/python3.9/site-packages/airflow/migrations/env.py", line 101, 
in run_migrations_online
       context.run_migrations()
     File "<string>", line 8, in run_migrations
     File "/a/lib/python3.9/site-packages/alembic/runtime/environment.py", line 
853, in run_migrations
       self.get_context().run_migrations(**kw)
     File "/a/lib/python3.9/site-packages/alembic/runtime/migration.py", line 
623, in run_migrations
       step.migration_fn(**kw)
     File 
"/a/lib/python3.9/site-packages/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py",
 line 81, in upgrade
       dag = dagbag.get_dag(ti.dag_id)
     File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 71, 
in wrapper
       return func(*args, session=session, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/models/dagbag.py", line 217, 
in get_dag
       orm_dag = DagModel.get_current(root_dag_id, session=session)
     File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 68, 
in wrapper
       return func(*args, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/models/dag.py", line 2775, in 
get_current
       return session.query(cls).filter(cls.dag_id == dag_id).first()
     File "/a/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2734, 
in first
       return self.limit(1)._iter().first()
     File "/a/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2818, 
in _iter
       result = self.session.execute(
     File "/a/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 
1670, in execute
       result = conn._execute_20(statement, params or {}, execution_options)
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 
1520, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/a/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 
313, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 
1389, in _execute_clauseelement
       ret = self._execute_context(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 
1748, in _execute_context
       self._handle_dbapi_exception(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 
1929, in _handle_dbapi_exception
       util.raise_(
     File "/a/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, 
in raise_
       raise exception
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 
1705, in _execute_context
       self.dialect.do_execute(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 
716, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column 
dag.root_dag_id does not exist
   LINE 1: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root...
                                            ^
    
   [SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, 
dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active 
AS dag_is_active, dag.last_parsed_time AS dag_last_parsed_time, 
dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, 
dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, 
dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS 
dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS 
dag_schedule_interval, dag.timetable_description AS dag_timetable_description, 
dag.max_active_tasks AS dag_max_active_tasks, dag.max_active_runs AS 
dag_max_active_runs, dag.has_task_concurrency_limits AS 
dag_has_task_concurrency_limits, dag.has_import_errors AS 
dag_has_import_errors, dag.next_dagrun AS dag_next_dagrun, 
dag.next_dagrun_data_interval_start AS dag_next_dagrun_data_interval_start, 
dag.next_dagrun_data_interval_end AS dag_next_dagrun_data_interval_end, dag.ne
 xt_dagrun_create_after AS dag_next_dagrun_create_after
   FROM dag
   WHERE dag.dag_id = %(dag_id_1)s
    LIMIT %(param_1)s]
   [parameters: {'dag_id_1': 'dns_dv_challenge_initiate_v2', 'param_1': 1}]
   (Background on this error at: http://sqlalche.me/e/14/f405)
   ```
   
   ### What you think should happen instead
   
   The database upgrade should have succeeded.
   
   ### How to reproduce
   
   Install Airflow 1.8.0 and initialize airflow db.
   Create and execute some dags so that Dag table gets populated.
   Upgrade airflow to 2.3.2 and upgrade the database via `airflow db upgrade`.
   
   ### Operating System
   
   OS-X
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-ftp==2.1.2
   apache-airflow-providers-http==2.1.2
   apache-airflow-providers-imap==2.2.3
   apache-airflow-providers-sqlite==2.1.3
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   The sqlalchemy ProgrammingError exception occurred during the alembic 
database upgrade 127d2bf2dfa7 -> cc1e65623dc7. Searching for the target 
revision cc1e65623dc7 in the sources of Airflow v2.3.2, matches this Airflow 
database migration version script: 
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py.
 Here, we observe the following imports from Airflow's own code at 
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py#L31-L33:
   
   from airflow import settings
   from airflow.compat.sqlalchemy import inspect
   from airflow.models import DagBag
   And the code at lines 
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py#L74-L81
 triggers the aforementioned ProgrammingError exception:
   
   dagbag = DagBag(settings.DAGS_FOLDER)
   query = 
session.query(sa.func.count(TaskInstance.max_tries)).filter(TaskInstance.max_tries
 == -1)
   # Separate db query in batch to prevent loading entire table
   # into memory and cause out of memory error.
   while query.scalar():
       tis = session.query(TaskInstance).filter(TaskInstance.max_tries == 
-1).limit(BATCH_SIZE).all()
       for ti in tis:
           dag = dagbag.get_dag(ti.dag_id)
   What's happening? Let's recall that the error during migration to database 
revision cc1e65623dc7 was "sqlalchemy.exc.ProgrammingError: 
(psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist". So, 
the failure occurred because the call to `dagbag.get_dag(ti.dag_id)` was trying 
to load a Dag model with the column "root_dag_id" and this column didn't exist 
yet in the Airflow database revision 127d2bf2dfa7 (from Airflow v1.8.0). The 
column "root_dag_id" would be added by a later version script 
https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0045_1_10_7_add_root_dag_id_to_dag.py
 from Airflow v1.10.7.
   
   The problem with 0023_1_8_2_add_max_tries_column_to_task_instance.py is that 
it imports and uses DagBag to fetch Dag models which had all the expected 
columns during the time that this migration version script was developed, but 
this was no longer the case when the I actually tried to run the airflow db 
upgrade - by this time, the Dag columns had changed, and so the Dag model was 
trying to load the column "root_dag_id" (since the upgrade was happening at 
Airflow v2.3.2 which has this column), but the actual database (which was 
initialized by Airflow v1.8.0) didn't have this column yet!
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to