potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r756031105
##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,85 @@ def check_migrations(timeout):
:param timeout: Timeout for the migration in seconds
:return: None
"""
- from alembic.runtime.migration import MigrationContext
+ for ticker in range(timeout):
+ source_heads = get_source_heads()
+ db_heads = get_db_heads()
+ if source_heads == db_heads:
+ return
+ time.sleep(1)
+ log.info('Waiting for migrations... %s second(s)', ticker)
+ raise TimeoutError(
+ f"There are still unapplied migrations after {timeout} seconds. "
+ f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source
Code: {source_heads}"
+ )
+
+
+def get_source_heads() -> Set:
+ """
+ Function to get the current migration head in the source code.
+
+ :return: a set of migration head(s)
+ """
from alembic.script import ScriptDirectory
config = _get_alembic_config()
script_ = ScriptDirectory.from_config(config)
+ return set(script_.get_heads())
+
+
+def get_db_heads() -> Set:
+ """
+ Function to get the current migration head in the database.
+
+ :return: a set of migration head(s)
+ """
+ from alembic.runtime.migration import MigrationContext
+
with settings.engine.connect() as connection:
context = MigrationContext.configure(connection)
- ticker = 0
- while True:
- source_heads = set(script_.get_heads())
- db_heads = set(context.get_current_heads())
- if source_heads == db_heads:
- break
- if ticker >= timeout:
- raise TimeoutError(
- f"There are still unapplied migrations after {ticker}
seconds. "
- f"Migration Head(s) in DB: {db_heads} | Migration Head(s)
in Source Code: {source_heads}"
- )
- ticker += 1
- time.sleep(1)
- log.info('Waiting for migrations... %s second(s)', ticker)
+ return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+ """Check and run migrations if necessary. Only use in a tty"""
+ source_heads = get_source_heads()
+ db_heads = get_db_heads()
+ db_command = None
+ command_name = None
+ verb = None
+ if len(db_heads) < 1:
+ db_command = initdb
+ command_name = "init"
+ verb = "initialization"
+ elif source_heads != db_heads:
+ db_command = upgradedb
+ command_name = "upgrade"
+ verb = "upgrade"
+
+ if sys.stdout.isatty() and verb:
+ print()
+ question = f"Please confirm database {verb} (or wait 4 seconds to skip
it). Are you sure?"
+ try:
+ ans = helpers.prompt_with_timeout(question, timeout=4)
+ if ans:
+ try:
+ db_command()
+ print(f"DB {verb} done")
+ except Exception as error:
+ print(error)
+ print(
+ "You still have unapplied migrations. "
+ "You may need to reset the database by running
`airflow db reset`",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+ except AirflowException:
+ pass
+ elif source_heads != db_heads:
+ print(
+ f"ERROR: You need to {verb} the database. Please run `airflow db
{command_name}`", file=sys.stderr
Review comment:
Following discussion in https://github.com/apache/airflow/issues/19784
and scenario where Helm migration images might be different than airlflow
image, I think it would be good to add a hint about using the same version:
```suggestion
f"ERROR: You need to {verb} the database. Please run `airflow db
{command_name}`. Make sure the command is run using airflow version
{version}.", file=sys.stderr
```
(version should be imported above)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]