potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716076101
##########
File path: airflow/utils/db.py
##########
@@ -626,27 +627,74 @@ def check_migrations(timeout):
:param timeout: Timeout for the migration in seconds
:return: None
"""
- from alembic.runtime.migration import MigrationContext
+ ticker = 0
+ while True:
+ source_heads = get_source_heads()
+ db_heads = get_db_heads()
+ if source_heads == db_heads:
+ break
+ if ticker >= timeout:
+ raise TimeoutError(
+ f"There are still unapplied migrations after {ticker} seconds.
"
+ f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in
Source Code: {source_heads}"
+ )
+ ticker += 1
+ time.sleep(1)
+ log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+ """
+ Function to get the current migration head in the source code.
+
+ :return: List of migration head(s)
+ """
from alembic.script import ScriptDirectory
config = _get_alembic_config()
script_ = ScriptDirectory.from_config(config)
+ return set(script_.get_heads())
+
+
+def get_db_heads():
+ """
+ Function to get the current migration head in the database.
+
+ :return: List of migration head(s)
+ """
+ from alembic.runtime.migration import MigrationContext
+
with settings.engine.connect() as connection:
context = MigrationContext.configure(connection)
- ticker = 0
- while True:
- source_heads = set(script_.get_heads())
- db_heads = set(context.get_current_heads())
- if source_heads == db_heads:
- break
- if ticker >= timeout:
- raise TimeoutError(
- f"There are still unapplied migrations after {ticker}
seconds. "
- f"Migration Head(s) in DB: {db_heads} | Migration Head(s)
in Source Code: {source_heads}"
- )
- ticker += 1
- time.sleep(1)
- log.info('Waiting for migrations... %s second(s)', ticker)
+ return set(context.get_current_heads())
+
+
+def check_and_run_migrations(source_heads, db_heads):
+ """Check and run migrations if necessary. Only use in a tty"""
+ from rich import print as rich_print
+
+ source_heads = get_source_heads()
+ db_heads = get_db_heads()
+ if len(db_heads) < 0:
+ if sys.stdout.isatty():
+ # initialize database
+ print("DB: " + repr(settings.engine.url))
+ initdb()
+ print("DB Initialization done")
+ else:
+ rich_print("[red][bold]ERROR:[/bold] You need to initialize the
database")
Review comment:
I am a long-time fan of hard-exit if we find some serious inconsistency.
This not only prevents data corruption but is also much cleaner and
communicates the problem better - because you immediately see what the problem
is (the last log entry). Also you prevent cases like endless loops. Those are
the worst kind of problems where you think your program is running, but it is
just hanging. When in doubt if you are in a consistent state - exit
immediately. This is the only way to make sure it WILL be noticed and acted on.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]