ashb commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716502072



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,102 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. 
"
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in 
Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} 
seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) 
in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 1:
+        if sys.stdout.isatty():
+            print()
+            question = (
+                "Please confirm database initialization " "(or wait 4 seconds 
to skip it). Are you sure?"
+            )
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # initialize database
+                    print("DB: " + repr(settings.engine.url))
+                    initdb()
+                    print("DB Initialization done")
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to initialize the database. 
"
+                "Run `airflow db init` to initialize the database"
+            )
+            sys.exit(1)
+    elif source_heads != db_heads:
+        if sys.stdout.isatty():
+            print()
+            question = "Please confirm database upgrade" " (or wait 4 seconds 
to skip it). Are you sure?"
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # upgrade database
+                    try:
+                        print("DB: " + repr(settings.engine.url))
+                        upgradedb()
+                        print("DB Upgrade done")
+                    except Exception as e:
+                        print(e)
+                        rich_print(
+                            "[red][bold]ERROR:[/bold] You still have unapplied 
migrations. "
+                            "You may need to reset the database by running 
`airflow db reset`"
+                        )
+                        sys.exit(1)
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to upgrade the database. "
+                "Run `airflow db upgrade` to upgrade"
+            )
+            sys.exit(1)

Review comment:
       We can reduce the duplication in these blocks a lot by doing something 
like this
   
   ```suggestion
   def check_and_run_migrations():
       """Check and run migrations if necessary. Only use in a tty"""
       from rich import print as rich_print
   
       source_heads = get_source_heads()
       db_heads = get_db_heads()
       if len(db_heads) < 1:
           db_command = initdb
           command_name = "init"
           verb = "initialize"
       else:
           db_command = upgradedb
           command_name = "upgrade"
           verb = "upgrade"
           
       if sys.stdout.isatty():
           print()
           question = (
               f"Please confirm database {verb} (or wait 4 seconds to skip it). 
Are you sure?"
           )
           ans = helpers.prompt_with_timeout(question, timeout=4)
           if ans:
               # initialize database
               print("DB: " + repr(settings.engine.url))
               db_command()
               print(f"DB {verb} done")
       else:
           rich_print(
               f"[red][bold]ERROR:[/bold] You need to {verb} the database. 
Please run `airflow db {command_name}`"
           )
           sys.exit(1)
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to